You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/05/04 06:01:58 UTC
[1/3] incubator-beam git commit: Add Matcher serializer in
TestPipeline.
Repository: incubator-beam
Updated Branches:
refs/heads/master 6819dff86 -> 3f0eead50
Add Matcher serializer in TestPipeline.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b63fb94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b63fb94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b63fb94
Branch: refs/heads/master
Commit: 2b63fb9463a96972605ee92bc40080b0a16dfa80
Parents: 6819dff
Author: Jason Kuster <ja...@google.com>
Authored: Fri Apr 29 17:25:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 19:57:18 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 18 +-
.../testing/TestDataflowPipelineRunner.java | 19 +-
.../testing/TestDataflowPipelineRunnerTest.java | 220 +++++++++++++++++++
.../beam/sdk/testing/MatcherDeserializer.java | 46 ++++
.../beam/sdk/testing/MatcherSerializer.java | 44 ++++
.../beam/sdk/testing/SerializableMatcher.java | 7 +-
.../apache/beam/sdk/testing/TestPipeline.java | 26 ++-
.../beam/sdk/testing/TestPipelineOptions.java | 39 ++++
.../beam/sdk/testing/TestPipelineTest.java | 53 +++++
9 files changed, 439 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 56ca98c..a09ec02 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,13 +18,7 @@
package org.apache.beam.examples;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
import org.apache.beam.examples.WordCount.WordCountOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -34,6 +28,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.util.Date;
+
/**
* End-to-end tests of WordCount.
*/
@@ -43,8 +39,7 @@ public class WordCountIT {
/**
* Options for the WordCount Integration Test.
*/
- public static interface WordCountITOptions extends TestPipelineOptions,
- WordCountOptions, DataflowPipelineOptions {
+ public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions {
}
@Test
@@ -52,13 +47,8 @@ public class WordCountIT {
PipelineOptionsFactory.register(WordCountITOptions.class);
WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
- options.getJobName(), "output", "results"}));
+ String.format("WordCountIT-%tF-%<tH-%<tM-%<tS", new Date()), "output", "results"}));
WordCount.main(TestPipeline.convertToArgs(options));
- PipelineResult result =
- TestDataflowPipelineRunner.getPipelineResultByJobName(options.getJobName());
-
- assertNotNull("Result was null.", result);
- assertEquals("Pipeline state was not done.", PipelineResult.State.DONE, result.getState());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index 3ab91f5..2b53a65 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -17,18 +17,20 @@
*/
package org.apache.beam.runners.dataflow.testing;
+import static org.hamcrest.MatcherAssert.assertThat;
+
import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -39,16 +41,13 @@ import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -62,8 +61,6 @@ import java.util.concurrent.TimeUnit;
public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
private static final String TENTATIVE_COUNTER = "tentative";
private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
- private static final Map<String, PipelineResult> EXECUTION_RESULTS =
- new ConcurrentHashMap<String, PipelineResult>();
private final TestDataflowPipelineOptions options;
private final DataflowPipelineRunner runner;
@@ -87,10 +84,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
return new TestDataflowPipelineRunner(dataflowOptions);
}
- public static PipelineResult getPipelineResultByJobName(String jobName) {
- return EXECUTION_RESULTS.get(jobName);
- }
-
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
return run(pipeline, runner);
@@ -98,6 +91,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
+ TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
final DataflowPipelineJob job;
try {
job = runner.run(pipeline);
@@ -108,6 +102,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
LOG.info("Running Dataflow job {} with {} expected assertions.",
job.getJobId(), expectedNumberOfAssertions);
+ assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
@@ -151,6 +147,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
throw new AssertionError(messageHandler.getErrorMessage() == null ?
"The dataflow did not return a failure reason."
: messageHandler.getErrorMessage());
+ } else {
+ assertThat(job, testPipelineOptions.getOnSuccessMatcher());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -161,7 +159,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
} catch (IOException e) {
throw new RuntimeException(e);
}
- EXECUTION_RESULTS.put(options.getJobName(), job);
return job;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
index a45284c..fbaf116 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.testing;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -36,10 +37,13 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
@@ -61,6 +65,8 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
@@ -74,6 +80,7 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@@ -378,4 +385,217 @@ public class TestDataflowPipelineRunnerTest {
// instead of inside the try-catch block.
fail("AssertionError expected");
}
+
+ @Test
+ public void testBatchOnCreateMatcher() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testStreamingOnCreateMatcher() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ return;
+ }
+ fail("Expected an exception on pipeline failure.");
+ }
+
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.FAILED);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ return;
+ }
+ fail("Expected an exception on pipeline failure.");
+ }
+
+ static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ private final DataflowPipelineJob mockJob;
+ private final int called;
+
+ public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+ this.mockJob = job;
+ this.called = times;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof PipelineResult)) {
+ fail(String.format("Expected PipelineResult but received %s", o));
+ }
+ try {
+ verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ } catch (IOException | InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ assertSame(mockJob, o);
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
+
+ static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ @Override
+ public boolean matches(Object o) {
+ fail("OnSuccessMatcher should not be called on pipeline failure.");
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
new file mode 100644
index 0000000..8498470
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import com.google.api.client.util.Base64;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * MatcherDeserializer is used with Jackson to enable deserialization of SerializableMatchers.
+ */
+class MatcherDeserializer extends JsonDeserializer<SerializableMatcher<?>> {
+ @Override
+ public SerializableMatcher<?> deserialize(JsonParser jsonParser,
+ DeserializationContext deserializationContext)
+ throws IOException, JsonProcessingException {
+ ObjectNode node = jsonParser.readValueAsTree();
+ String matcher = node.get("matcher").asText();
+ byte[] in = Base64.decodeBase64(matcher);
+ return (SerializableMatcher<?>) SerializableUtils
+ .deserializeFromByteArray(in, "SerializableMatcher");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
new file mode 100644
index 0000000..0feeae0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import com.google.api.client.util.Base64;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+
+/**
+ * MatcherSerializer is used with Jackson to enable serialization of SerializableMatchers.
+ */
+class MatcherSerializer extends JsonSerializer<SerializableMatcher<?>> {
+ @Override
+ public void serialize(SerializableMatcher<?> matcher, JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+ byte[] out = SerializableUtils.serializeToByteArray(matcher);
+ String encodedString = Base64.encodeBase64String(out);
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField("matcher", encodedString);
+ jsonGenerator.writeEndObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
index 9132db7..a465bbe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.testing;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.hamcrest.Matcher;
import java.io.Serializable;
@@ -32,5 +34,8 @@ import java.io.Serializable;
*
* @param <T> The type of value matched.
*/
-interface SerializableMatcher<T> extends Matcher<T>, Serializable {
+@JsonSerialize(using = MatcherSerializer.class)
+@JsonDeserialize(using = MatcherDeserializer.class)
+public interface SerializableMatcher<T> extends Matcher<T>, Serializable {
}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a51a24e..a4921d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -31,7 +31,11 @@ import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.experimental.categories.Category;
@@ -39,7 +43,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.Map;
+import java.util.Map.Entry;
import javax.annotation.Nullable;
@@ -157,16 +161,24 @@ public class TestPipeline extends Pipeline {
public static String[] convertToArgs(PipelineOptions options) {
try {
- Map<String, Object> stringOpts = (Map<String, Object>) MAPPER.readValue(
- MAPPER.writeValueAsBytes(options), Map.class).get("options");
+ byte[] opts = MAPPER.writeValueAsBytes(options);
+ JsonParser jsonParser = MAPPER.getFactory().createParser(opts);
+ TreeNode node = jsonParser.readValueAsTree();
+ ObjectNode optsNode = (ObjectNode) node.get("options");
ArrayList<String> optArrayList = new ArrayList<>();
- for (Map.Entry<String, Object> entry : stringOpts.entrySet()) {
- optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+ Iterator<Entry<String, JsonNode>> entries = optsNode.fields();
+ while (entries.hasNext()) {
+ Entry<String, JsonNode> entry = entries.next();
+ if (entry.getValue().isTextual()) {
+ optArrayList.add("--" + entry.getKey() + "=" + entry.getValue().asText());
+ } else {
+ optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+ }
}
return optArrayList.toArray(new String[optArrayList.size()]);
- } catch (Exception e) {
- return null;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 2599ae2..ff553ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -17,7 +17,12 @@
*/
package org.apache.beam.sdk.testing;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
/**
* {@link TestPipelineOptions} is a set of options for test pipelines.
@@ -27,4 +32,38 @@ import org.apache.beam.sdk.options.PipelineOptions;
public interface TestPipelineOptions extends PipelineOptions {
String getTempRoot();
void setTempRoot(String value);
+
+ @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+ SerializableMatcher<PipelineResult> getOnCreateMatcher();
+ void setOnCreateMatcher(SerializableMatcher<PipelineResult> value);
+
+ @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+ SerializableMatcher<PipelineResult> getOnSuccessMatcher();
+ void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value);
+
+ /**
+ * Factory for {@link PipelineResult} matchers which always pass.
+ */
+ class AlwaysPassMatcherFactory
+ implements DefaultValueFactory<SerializableMatcher<PipelineResult>> {
+ @Override
+ public SerializableMatcher<PipelineResult> create(PipelineOptions options) {
+ return new AlwaysPassMatcher();
+ }
+ }
+
+ /**
+ * Matcher which will always pass.
+ */
+ class AlwaysPassMatcher extends BaseMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+ @Override
+ public boolean matches(Object o) {
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 9460e13..8af4ff2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -31,6 +32,8 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
@@ -38,7 +41,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
+import java.util.Date;
import java.util.List;
+import java.util.UUID;
/** Tests for {@link TestPipeline}. */
@RunWith(JUnit4.class)
@@ -116,4 +121,52 @@ public class TestPipelineTest {
return TestPipeline.create();
}
}
+
+ @Test
+ public void testMatcherSerializationDeserialization() {
+ TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class);
+ SerializableMatcher m1 = new TestMatcher();
+ SerializableMatcher m2 = new TestMatcher();
+
+ opts.setOnCreateMatcher(m1);
+ opts.setOnSuccessMatcher(m2);
+
+ String[] arr = TestPipeline.convertToArgs(opts);
+ TestPipelineOptions newOpts = PipelineOptionsFactory.fromArgs(arr)
+ .as(TestPipelineOptions.class);
+
+ assertEquals(m1, newOpts.getOnCreateMatcher());
+ assertEquals(m2, newOpts.getOnSuccessMatcher());
+ }
+
+ /**
+ * TestMatcher is a matcher designed for testing matcher serialization/deserialization.
+ */
+ public static class TestMatcher extends BaseMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+ private final UUID uuid = UUID.randomUUID();
+ @Override
+ public boolean matches(Object o) {
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("%tL", new Date()));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TestMatcher)) {
+ return false;
+ }
+ TestMatcher other = (TestMatcher) obj;
+ return other.uuid.equals(uuid);
+ }
+
+ @Override
+ public int hashCode() {
+ return uuid.hashCode();
+ }
+ }
}
[2/3] incubator-beam git commit: [BEAM-256] This closes #273
Posted by lc...@apache.org.
[BEAM-256] This closes #273
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8fce48b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8fce48b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8fce48b9
Branch: refs/heads/master
Commit: 8fce48b9617350c3ffeb16d5d9c27e0e0be3738a
Parents: 6819dff 2b63fb9
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 3 19:58:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 19:58:09 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCountIT.java | 18 +-
.../testing/TestDataflowPipelineRunner.java | 19 +-
.../testing/TestDataflowPipelineRunnerTest.java | 220 +++++++++++++++++++
.../beam/sdk/testing/MatcherDeserializer.java | 46 ++++
.../beam/sdk/testing/MatcherSerializer.java | 44 ++++
.../beam/sdk/testing/SerializableMatcher.java | 7 +-
.../apache/beam/sdk/testing/TestPipeline.java | 26 ++-
.../beam/sdk/testing/TestPipelineOptions.java | 39 ++++
.../beam/sdk/testing/TestPipelineTest.java | 53 +++++
9 files changed, 439 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
[3/3] incubator-beam git commit: [BEAM-256] Address wrong import
order and add millis to output path for WordCountIT
Posted by lc...@apache.org.
[BEAM-256] Address wrong import order and add millis to output path for WordCountIT
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f0eead5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f0eead5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f0eead5
Branch: refs/heads/master
Commit: 3f0eead50525604221c5dc094d82d19d5230c4c5
Parents: 8fce48b
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 3 20:02:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 20:02:28 2016 -0700
----------------------------------------------------------------------
.../java/src/test/java/org/apache/beam/examples/WordCountIT.java | 2 +-
.../main/java/org/apache/beam/sdk/testing/MatcherSerializer.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f0eead5/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index a09ec02..503445e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -47,7 +47,7 @@ public class WordCountIT {
PipelineOptionsFactory.register(WordCountITOptions.class);
WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
- String.format("WordCountIT-%tF-%<tH-%<tM-%<tS", new Date()), "output", "results"}));
+ String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), "output", "results"}));
WordCount.main(TestPipeline.convertToArgs(options));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f0eead5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
index 0feeae0..8452486 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.sdk.testing;
-import com.google.api.client.util.Base64;
-
import org.apache.beam.sdk.util.SerializableUtils;
+import com.google.api.client.util.Base64;
+
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;