You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/05/04 06:01:58 UTC

[1/3] incubator-beam git commit: Add Matcher serializer in TestPipeline.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 6819dff86 -> 3f0eead50


Add Matcher serializer in TestPipeline.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b63fb94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b63fb94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b63fb94

Branch: refs/heads/master
Commit: 2b63fb9463a96972605ee92bc40080b0a16dfa80
Parents: 6819dff
Author: Jason Kuster <ja...@google.com>
Authored: Fri Apr 29 17:25:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 19:57:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   |  18 +-
 .../testing/TestDataflowPipelineRunner.java     |  19 +-
 .../testing/TestDataflowPipelineRunnerTest.java | 220 +++++++++++++++++++
 .../beam/sdk/testing/MatcherDeserializer.java   |  46 ++++
 .../beam/sdk/testing/MatcherSerializer.java     |  44 ++++
 .../beam/sdk/testing/SerializableMatcher.java   |   7 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  26 ++-
 .../beam/sdk/testing/TestPipelineOptions.java   |  39 ++++
 .../beam/sdk/testing/TestPipelineTest.java      |  53 +++++
 9 files changed, 439 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index 56ca98c..a09ec02 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,13 +18,7 @@
 
 package org.apache.beam.examples;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
 import org.apache.beam.examples.WordCount.WordCountOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -34,6 +28,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.Date;
+
 /**
  * End-to-end tests of WordCount.
  */
@@ -43,8 +39,7 @@ public class WordCountIT {
   /**
    * Options for the WordCount Integration Test.
    */
-  public static interface WordCountITOptions extends TestPipelineOptions,
-         WordCountOptions, DataflowPipelineOptions {
+  public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions {
   }
 
   @Test
@@ -52,13 +47,8 @@ public class WordCountIT {
     PipelineOptionsFactory.register(WordCountITOptions.class);
     WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
     options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
-        options.getJobName(), "output", "results"}));
+        String.format("WordCountIT-%tF-%<tH-%<tM-%<tS", new Date()), "output", "results"}));
 
     WordCount.main(TestPipeline.convertToArgs(options));
-    PipelineResult result =
-        TestDataflowPipelineRunner.getPipelineResultByJobName(options.getJobName());
-
-    assertNotNull("Result was null.", result);
-    assertEquals("Pipeline state was not done.", PipelineResult.State.DONE, result.getState());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index 3ab91f5..2b53a65 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.dataflow.testing;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+
 import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -39,16 +41,13 @@ import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -62,8 +61,6 @@ import java.util.concurrent.TimeUnit;
 public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
   private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-  private static final Map<String, PipelineResult> EXECUTION_RESULTS =
-      new ConcurrentHashMap<String, PipelineResult>();
 
   private final TestDataflowPipelineOptions options;
   private final DataflowPipelineRunner runner;
@@ -87,10 +84,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     return new TestDataflowPipelineRunner(dataflowOptions);
   }
 
-  public static PipelineResult getPipelineResultByJobName(String jobName) {
-    return EXECUTION_RESULTS.get(jobName);
-  }
-
   @Override
   public DataflowPipelineJob run(Pipeline pipeline) {
     return run(pipeline, runner);
@@ -98,6 +91,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
 
   DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
 
+    TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
     final DataflowPipelineJob job;
     try {
       job = runner.run(pipeline);
@@ -108,6 +102,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     LOG.info("Running Dataflow job {} with {} expected assertions.",
         job.getJobId(), expectedNumberOfAssertions);
 
+    assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
     CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
         job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
 
@@ -151,6 +147,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
         throw new AssertionError(messageHandler.getErrorMessage() == null ?
             "The dataflow did not return a failure reason."
             : messageHandler.getErrorMessage());
+      } else {
+        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -161,7 +159,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    EXECUTION_RESULTS.put(options.getJobName(), job);
     return job;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
index a45284c..fbaf116 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.testing;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -36,10 +37,13 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.NoopPathValidator;
@@ -61,6 +65,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -74,6 +80,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
@@ -378,4 +385,217 @@ public class TestDataflowPipelineRunnerTest {
     // instead of inside the try-catch block.
     fail("AssertionError expected");
   }
+
+  @Test
+  public void testBatchOnCreateMatcher() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnCreateMatcher() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+          any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.FAILED);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+          any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    private final DataflowPipelineJob mockJob;
+    private final int called;
+
+    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+      this.mockJob = job;
+      this.called = times;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof PipelineResult)) {
+        fail(String.format("Expected PipelineResult but received %s", o));
+      }
+      try {
+        verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
+            any(JobMessagesHandler.class));
+      } catch (IOException | InterruptedException e) {
+        throw new AssertionError(e);
+      }
+      assertSame(mockJob, o);
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+
+  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    @Override
+    public boolean matches(Object o) {
+      fail("OnSuccessMatcher should not be called on pipeline failure.");
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
new file mode 100644
index 0000000..8498470
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherDeserializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import com.google.api.client.util.Base64;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * MatcherDeserializer is used with Jackson to enable deserialization of SerializableMatchers.
+ */
+class MatcherDeserializer extends JsonDeserializer<SerializableMatcher<?>> {
+  @Override
+  public SerializableMatcher<?> deserialize(JsonParser jsonParser,
+      DeserializationContext deserializationContext)
+      throws IOException, JsonProcessingException {
+    ObjectNode node = jsonParser.readValueAsTree();
+    String matcher = node.get("matcher").asText();
+    byte[] in = Base64.decodeBase64(matcher);
+    return (SerializableMatcher<?>) SerializableUtils
+        .deserializeFromByteArray(in, "SerializableMatcher");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
new file mode 100644
index 0000000..0feeae0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import com.google.api.client.util.Base64;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+
+/**
+ * MatcherSerializer is used with Jackson to enable serialization of SerializableMatchers.
+ */
+class MatcherSerializer extends JsonSerializer<SerializableMatcher<?>> {
+  @Override
+  public void serialize(SerializableMatcher<?> matcher, JsonGenerator jsonGenerator,
+      SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
+    byte[] out = SerializableUtils.serializeToByteArray(matcher);
+    String encodedString = Base64.encodeBase64String(out);
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeStringField("matcher", encodedString);
+    jsonGenerator.writeEndObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
index 9132db7..a465bbe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatcher.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.hamcrest.Matcher;
 
 import java.io.Serializable;
@@ -32,5 +34,8 @@ import java.io.Serializable;
  *
  * @param <T> The type of value matched.
  */
-interface SerializableMatcher<T> extends Matcher<T>, Serializable {
+@JsonSerialize(using = MatcherSerializer.class)
+@JsonDeserialize(using = MatcherDeserializer.class)
+public interface SerializableMatcher<T> extends Matcher<T>, Serializable {
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a51a24e..a4921d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -31,7 +31,11 @@ import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import org.junit.experimental.categories.Category;
 
@@ -39,7 +43,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
 
@@ -157,16 +161,24 @@ public class TestPipeline extends Pipeline {
 
   public static String[] convertToArgs(PipelineOptions options) {
     try {
-      Map<String, Object> stringOpts = (Map<String, Object>) MAPPER.readValue(
-          MAPPER.writeValueAsBytes(options), Map.class).get("options");
+      byte[] opts = MAPPER.writeValueAsBytes(options);
 
+      JsonParser jsonParser = MAPPER.getFactory().createParser(opts);
+      TreeNode node = jsonParser.readValueAsTree();
+      ObjectNode optsNode = (ObjectNode) node.get("options");
       ArrayList<String> optArrayList = new ArrayList<>();
-      for (Map.Entry<String, Object> entry : stringOpts.entrySet()) {
-        optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+      Iterator<Entry<String, JsonNode>> entries = optsNode.fields();
+      while (entries.hasNext()) {
+        Entry<String, JsonNode> entry = entries.next();
+        if (entry.getValue().isTextual()) {
+          optArrayList.add("--" + entry.getKey() + "=" + entry.getValue().asText());
+        } else {
+          optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+        }
       }
       return optArrayList.toArray(new String[optArrayList.size()]);
-    } catch (Exception e) {
-      return null;
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 2599ae2..ff553ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -17,7 +17,12 @@
  */
 package org.apache.beam.sdk.testing;
 
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 
 /**
  * {@link TestPipelineOptions} is a set of options for test pipelines.
@@ -27,4 +32,38 @@ import org.apache.beam.sdk.options.PipelineOptions;
 public interface TestPipelineOptions extends PipelineOptions {
   String getTempRoot();
   void setTempRoot(String value);
+
+  @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  SerializableMatcher<PipelineResult> getOnCreateMatcher();
+  void setOnCreateMatcher(SerializableMatcher<PipelineResult> value);
+
+  @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  SerializableMatcher<PipelineResult> getOnSuccessMatcher();
+  void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value);
+
+  /**
+   * Factory for {@link PipelineResult} matchers which always pass.
+   */
+  class AlwaysPassMatcherFactory
+      implements DefaultValueFactory<SerializableMatcher<PipelineResult>> {
+    @Override
+    public SerializableMatcher<PipelineResult> create(PipelineOptions options) {
+      return new AlwaysPassMatcher();
+    }
+  }
+
+  /**
+   * Matcher which will always pass.
+   */
+  class AlwaysPassMatcher extends BaseMatcher<PipelineResult>
+      implements SerializableMatcher<PipelineResult> {
+    @Override
+    public boolean matches(Object o) {
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b63fb94/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 9460e13..8af4ff2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -31,6 +32,8 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
@@ -38,7 +41,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
 /** Tests for {@link TestPipeline}. */
 @RunWith(JUnit4.class)
@@ -116,4 +121,52 @@ public class TestPipelineTest {
       return TestPipeline.create();
     }
   }
+
+  @Test
+  public void testMatcherSerializationDeserialization() {
+    TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class);
+    SerializableMatcher m1 = new TestMatcher();
+    SerializableMatcher m2 = new TestMatcher();
+
+    opts.setOnCreateMatcher(m1);
+    opts.setOnSuccessMatcher(m2);
+
+    String[] arr = TestPipeline.convertToArgs(opts);
+    TestPipelineOptions newOpts = PipelineOptionsFactory.fromArgs(arr)
+        .as(TestPipelineOptions.class);
+
+    assertEquals(m1, newOpts.getOnCreateMatcher());
+    assertEquals(m2, newOpts.getOnSuccessMatcher());
+  }
+
+  /**
+   * TestMatcher is a matcher designed for testing matcher serialization/deserialization.
+   */
+  public static class TestMatcher extends BaseMatcher<PipelineResult>
+      implements SerializableMatcher<PipelineResult> {
+    private final UUID uuid = UUID.randomUUID();
+    @Override
+    public boolean matches(Object o) {
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("%tL", new Date()));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TestMatcher)) {
+        return false;
+      }
+      TestMatcher other = (TestMatcher) obj;
+      return other.uuid.equals(uuid);
+    }
+
+    @Override
+    public int hashCode() {
+      return uuid.hashCode();
+    }
+  }
 }


[2/3] incubator-beam git commit: [BEAM-256] This closes #273

Posted by lc...@apache.org.
[BEAM-256] This closes #273


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8fce48b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8fce48b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8fce48b9

Branch: refs/heads/master
Commit: 8fce48b9617350c3ffeb16d5d9c27e0e0be3738a
Parents: 6819dff 2b63fb9
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 3 19:58:09 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 19:58:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountIT.java   |  18 +-
 .../testing/TestDataflowPipelineRunner.java     |  19 +-
 .../testing/TestDataflowPipelineRunnerTest.java | 220 +++++++++++++++++++
 .../beam/sdk/testing/MatcherDeserializer.java   |  46 ++++
 .../beam/sdk/testing/MatcherSerializer.java     |  44 ++++
 .../beam/sdk/testing/SerializableMatcher.java   |   7 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  26 ++-
 .../beam/sdk/testing/TestPipelineOptions.java   |  39 ++++
 .../beam/sdk/testing/TestPipelineTest.java      |  53 +++++
 9 files changed, 439 insertions(+), 33 deletions(-)
----------------------------------------------------------------------



[3/3] incubator-beam git commit: [BEAM-256] Address wrong import order and add millis to output path for WordCountIT

Posted by lc...@apache.org.
[BEAM-256] Address wrong import order and add millis to output path for WordCountIT


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f0eead5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f0eead5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f0eead5

Branch: refs/heads/master
Commit: 3f0eead50525604221c5dc094d82d19d5230c4c5
Parents: 8fce48b
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 3 20:02:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 20:02:28 2016 -0700

----------------------------------------------------------------------
 .../java/src/test/java/org/apache/beam/examples/WordCountIT.java | 2 +-
 .../main/java/org/apache/beam/sdk/testing/MatcherSerializer.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f0eead5/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index a09ec02..503445e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -47,7 +47,7 @@ public class WordCountIT {
     PipelineOptionsFactory.register(WordCountITOptions.class);
     WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
     options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
-        String.format("WordCountIT-%tF-%<tH-%<tM-%<tS", new Date()), "output", "results"}));
+        String.format("WordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), "output", "results"}));
 
     WordCount.main(TestPipeline.convertToArgs(options));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f0eead5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
index 0feeae0..8452486 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/MatcherSerializer.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.sdk.testing;
 
-import com.google.api.client.util.Base64;
-
 import org.apache.beam.sdk.util.SerializableUtils;
 
+import com.google.api.client.util.Base64;
+
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonSerializer;