You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:36 UTC
[11/50] [abbrv] beam git commit: [BEAM-972] Add unit tests to
Gearpump runner
[BEAM-972] Add unit tests to Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb0d333d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb0d333d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb0d333d
Branch: refs/heads/master
Commit: eb0d333df23624f54aae2abb8d7c7873f8ed2a7a
Parents: 555842a
Author: huafengw <fv...@gmail.com>
Authored: Tue Mar 21 19:45:10 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Mar 23 19:52:11 2017 +0800
----------------------------------------------------------------------
examples/java/pom.xml | 12 +++
pom.xml | 6 ++
runners/gearpump/README.md | 41 ++++++++-
runners/gearpump/pom.xml | 2 -
.../gearpump/GearpumpRunnerRegistrar.java | 4 +-
.../translators/WindowAssignTranslator.java | 2 +-
.../gearpump/translators/io/ValuesSource.java | 2 -
.../gearpump/GearpumpRunnerRegistrarTest.java | 55 ++++++++++++
.../runners/gearpump/PipelineOptionsTest.java | 73 ++++++++++++++++
.../translators/io/GearpumpSourceTest.java | 90 ++++++++++++++++++++
.../gearpump/translators/io/ValueSoureTest.java | 82 ++++++++++++++++++
11 files changed, 362 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index ed4a1d4..0a6d8fe 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -87,6 +87,18 @@
</dependencies>
</profile>
+ <!-- Include the Apache Gearpump (incubating) runner with -P gearpump-runner -->
+ <profile>
+ <id>gearpump-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-gearpump</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
<!-- Include the Apache Flink runner with -P flink-runner -->
<profile>
<id>flink-runner</id>
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3b8476..2cdb09d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -475,6 +475,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-gearpump</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/README.md
----------------------------------------------------------------------
diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md
index ad043fa..e8ce794 100644
--- a/runners/gearpump/README.md
+++ b/runners/gearpump/README.md
@@ -19,4 +19,43 @@
## Gearpump Beam Runner
-The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine.
\ No newline at end of file
+The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine.
+
+##Getting Started
+
+The following shows how to run the WordCount example that is provided with the source code on Beam.
+
+###Installing Beam
+
+To get the latest version of Beam with Gearpump-Runner, first clone the Beam repository:
+
+```
+git clone https://github.com/apache/beam
+git checkout gearpump-runner
+```
+
+Then switch to the newly created directory and run Maven to build the Apache Beam:
+
+```
+cd beam
+mvn clean install -DskipTests
+```
+
+Now Apache Beam and the Gearpump Runner are installed in your local Maven repository.
+
+###Running Wordcount Example
+
+Download something to count:
+
+```
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+```
+
+Run the pipeline, using the Gearpump runner:
+
+```
+cd examples/java
+mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt --runner=TestGearpumpRunner" -Pgearpump-runner
+```
+
+Once completed, check the output file /tmp/wordcounts.txt-00000-of-00001
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 9a6a432..a691801 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -99,13 +99,11 @@
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-streaming_2.11</artifactId>
<version>${gearpump.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-core_2.11</artifactId>
<version>${gearpump.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
index b77e1e3..3183d45 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
@@ -44,7 +44,9 @@ public class GearpumpRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ GearpumpRunner.class,
+ TestGearpumpRunner.class);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index fe6015a..29d8f02 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -39,7 +39,7 @@ import org.joda.time.Instant;
* {@link Window.Bound} is translated to Gearpump flatMap function.
*/
@SuppressWarnings("unchecked")
-public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
+public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
private static final long serialVersionUID = -964887482120489061L;
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index e0488cd..ccd5cdf 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -110,8 +110,6 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
this.source = source;
}
-
-
@Override
public boolean start() throws IOException {
if (null == iterator) {
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
new file mode 100644
index 0000000..9a01d20
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrarTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpRunnerRegistrar}.
+ */
+public class GearpumpRunnerRegistrarTest {
+
+ @Test
+ public void testFullName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", GearpumpRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), GearpumpRunner.class);
+ }
+
+ @Test
+ public void testClassName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", GearpumpRunner.class.getSimpleName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), GearpumpRunner.class);
+ }
+
+ @Test
+ public void testOptions() {
+ assertEquals(
+ ImmutableList.of(GearpumpPipelineOptions.class),
+ new GearpumpRunnerRegistrar.Options().getPipelineOptions());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
new file mode 100644
index 0000000..994856b
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpPipelineOptions}.
+ */
+public class PipelineOptionsTest {
+
+ @Test
+ public void testIgnoredFieldSerialization() throws IOException {
+ String appName = "forTest";
+ Map<String, String> serializers = Maps.newHashMap();
+ serializers.put("classA", "SerializerA");
+ GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+ .as(GearpumpPipelineOptions.class);
+ Config config = ClusterConfig.master(null);
+ EmbeddedCluster cluster = new EmbeddedCluster(config);
+ options.setSerializers(serializers);
+ options.setApplicationName(appName);
+ options.setEmbeddedCluster(cluster);
+ options.setParallelism(10);
+
+ byte[] serializedOptions = serialize(options);
+ GearpumpPipelineOptions deserializedOptions = new ObjectMapper()
+ .readValue(serializedOptions, PipelineOptions.class).as(GearpumpPipelineOptions.class);
+
+ assertNull(deserializedOptions.getEmbeddedCluster());
+ assertNull(deserializedOptions.getSerializers());
+ assertEquals(10, deserializedOptions.getParallelism());
+ assertEquals(appName, deserializedOptions.getApplicationName());
+ }
+
+ private byte[] serialize(Object obj) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, obj);
+ return baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
new file mode 100644
index 0000000..af5a1d2
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.streaming.source.Watermark;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GearpumpSource}.
+ */
+public class GearpumpSourceTest {
+ private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList(
+ TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+ TimestampedValue.of("b", new org.joda.time.Instant(0)),
+ TimestampedValue.of("c", new org.joda.time.Instant(53)),
+ TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1))
+ );
+
+ private static class SourceForTest<T> extends GearpumpSource<T> {
+ private ValuesSource<T> valuesSource;
+
+ SourceForTest(PipelineOptions options, ValuesSource<T> valuesSource) {
+ super(options);
+ this.valuesSource = valuesSource;
+ }
+
+ @Override
+ protected Source.Reader<T> createReader(PipelineOptions options) throws IOException {
+ return this.valuesSource.createReader(options, null);
+ }
+ }
+
+ @Test
+ public void testGearpumpSource() {
+ GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+ .as(GearpumpPipelineOptions.class);
+ ValuesSource<TimestampedValue<String>> valuesSource = new ValuesSource<>(TEST_VALUES,
+ TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
+ SourceForTest<TimestampedValue<String>> sourceForTest =
+ new SourceForTest<>(options, valuesSource);
+ sourceForTest.open(null, Instant.EPOCH);
+
+ for (TimestampedValue<String> value: TEST_VALUES) {
+ // Check the watermark first since the Source will advance when it's opened
+ Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
+ Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+
+ Message expectedMsg = Message.apply(
+ WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
+ value.getTimestamp().getMillis());
+ Message message = sourceForTest.read();
+ Assert.assertEquals(expectedMsg, message);
+ }
+
+ Assert.assertNull(sourceForTest.read());
+ Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb0d333d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
new file mode 100644
index 0000000..8c50703
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump.translators.io;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.util.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ValuesSource}.
+ */
+public class ValueSoureTest {
+
+ @Test
+ public void testValueSource() {
+ GearpumpPipelineOptions options = PipelineOptionsFactory.create()
+ .as(GearpumpPipelineOptions.class);
+ Config config = ClusterConfig.master(null);
+ config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
+ ConfigValueFactory.fromAnyRef(0));
+ EmbeddedCluster cluster = new EmbeddedCluster(config);
+ cluster.start();
+
+ options.setEmbeddedCluster(cluster);
+ options.setRunner(GearpumpRunner.class);
+ options.setParallelism(1);
+ Pipeline p = Pipeline.create(options);
+ List<String> values = Lists.newArrayList("1", "2", "3", "4", "5");
+ ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of());
+ p.apply(Read.from(source))
+ .apply(ParDo.of(new ResultCollector()));
+
+ p.run().waitUntilFinish();
+ cluster.stop();
+
+ Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS);
+ }
+
+ private static class ResultCollector extends DoFn<Object, Void> {
+ private static final Set<Object> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ RESULTS.add(c.element());
+ }
+ }
+}