You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/12/07 01:15:43 UTC

[1/4] incubator-beam git commit: [BEAM-1047] Update dataflow runner code to use DataflowClient wrapper.

Repository: incubator-beam
Updated Branches:
  refs/heads/master afedd68e8 -> b2b570f27


[BEAM-1047] Update dataflow runner code to use DataflowClient wrapper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce03f30c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce03f30c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce03f30c

Branch: refs/heads/master
Commit: ce03f30c1ee0b84ad2e7f10a6272ffb25548244a
Parents: e8c9686
Author: Pei He <pe...@google.com>
Authored: Mon Nov 28 11:47:42 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowClient.java   | 36 +++++++++++++-------
 .../runners/dataflow/DataflowPipelineJob.java   | 23 ++++++-------
 .../beam/runners/dataflow/DataflowRunner.java   | 16 +++------
 .../dataflow/testing/TestDataflowRunner.java    |  6 ++--
 .../runners/dataflow/util/MonitoringUtil.java   | 22 +++---------
 .../dataflow/DataflowPipelineJobTest.java       |  1 +
 .../transforms/DataflowGroupByKeyTest.java      | 14 +++++++-
 .../dataflow/transforms/DataflowViewTest.java   | 16 +++++++--
 .../dataflow/util/MonitoringUtilTest.java       | 21 +++---------
 9 files changed, 80 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
index f2081db..3536d72 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -35,27 +35,28 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 
 /**
- * Client library for {@link Dataflow}.
+ * Wrapper around the generated {@link Dataflow} client to provide common functionality.
  */
 public class DataflowClient {
 
   public static DataflowClient create(DataflowPipelineOptions options) {
-    return new DataflowClient(options.getDataflowClient(), options);
+    return new DataflowClient(options.getDataflowClient(), options.getProject());
   }
 
   private final Dataflow dataflow;
-  private final DataflowPipelineOptions options;
+  private final String projectId;
 
-  private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
+  private DataflowClient(Dataflow dataflow, String projectId) {
     this.dataflow = checkNotNull(dataflow, "dataflow");
-    this.options = checkNotNull(options, "options");
+    this.projectId = checkNotNull(projectId, "options");
   }
 
   /**
    * Creates the Dataflow {@link Job}.
    */
   public Job createJob(@Nonnull Job job) throws IOException {
-    Jobs.Create jobsCreate = dataflow.projects().jobs().create(options.getProject(), job);
+    checkNotNull(job, "job");
+    Jobs.Create jobsCreate = dataflow.projects().jobs().create(projectId, job);
     return jobsCreate.execute();
   }
 
@@ -65,7 +66,7 @@ public class DataflowClient {
    */
   public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException {
     Jobs.List jobsList = dataflow.projects().jobs()
-        .list(options.getProject())
+        .list(projectId)
         .setPageToken(pageToken);
     return jobsList.execute();
   }
@@ -74,8 +75,10 @@ public class DataflowClient {
    * Updates the Dataflow {@link Job} with the given {@code jobId}.
    */
   public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(content, "content");
     Jobs.Update jobsUpdate = dataflow.projects().jobs()
-        .update(options.getProject(), jobId, content);
+        .update(projectId, jobId, content);
     return jobsUpdate.execute();
   }
 
@@ -83,8 +86,9 @@ public class DataflowClient {
    * Gets the Dataflow {@link Job} with the given {@code jobId}.
    */
   public Job getJob(@Nonnull String jobId) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.Get jobsGet = dataflow.projects().jobs()
-        .get(options.getProject(), jobId);
+        .get(projectId, jobId);
     return jobsGet.execute();
   }
 
@@ -92,8 +96,9 @@ public class DataflowClient {
    * Gets the {@link JobMetrics} with the given {@code jobId}.
    */
   public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
-        .getMetrics(options.getProject(), jobId);
+        .getMetrics(projectId, jobId);
     return jobsGetMetrics.execute();
   }
 
@@ -102,8 +107,9 @@ public class DataflowClient {
    */
   public ListJobMessagesResponse listJobMessages(
       @Nonnull String jobId, @Nullable String pageToken) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
-        .list(options.getProject(), jobId)
+        .list(projectId, jobId)
         .setPageToken(pageToken);
     return jobMessagesList.execute();
   }
@@ -113,8 +119,10 @@ public class DataflowClient {
    */
   public LeaseWorkItemResponse leaseWorkItem(
       @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(request, "request");
     Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems()
-        .lease(options.getProject(), jobId, request);
+        .lease(projectId, jobId, request);
     return jobWorkItemsLease.execute();
   }
 
@@ -123,8 +131,10 @@ public class DataflowClient {
    */
   public ReportWorkItemStatusResponse reportWorkItemStatus(
       @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(request, "request");
     Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems()
-        .reportStatus(options.getProject(), jobId, request);
+        .reportStatus(projectId, jobId, request);
     return jobWorkItemsReportStatus.execute();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 58e85e0..00c88f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,10 +62,15 @@ public class DataflowPipelineJob implements PipelineResult {
   private String jobId;
 
   /**
+   * The {@link DataflowPipelineOptions} for the job.
+   */
+  private final DataflowPipelineOptions dataflowOptions;
+
+  /**
    * Client for the Dataflow service. This can be used to query the service
    * for information about the job.
    */
-  private DataflowPipelineOptions dataflowOptions;
+  private final DataflowClient dataflowClient;
 
   /**
    * The state the job terminated in or {@code null} if the job has not terminated.
@@ -124,6 +129,7 @@ public class DataflowPipelineJob implements PipelineResult {
       DataflowAggregatorTransforms aggregatorTransforms) {
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
+    this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
     this.aggregatorTransforms = aggregatorTransforms;
   }
 
@@ -241,7 +247,7 @@ public class DataflowPipelineJob implements PipelineResult {
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock) throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
+    MonitoringUtil monitor = new MonitoringUtil(dataflowClient);
 
     long lastTimestamp = 0;
     BackOff backoff;
@@ -334,9 +340,7 @@ public class DataflowPipelineJob implements PipelineResult {
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
     try {
-      dataflowOptions.getDataflowClient().projects().jobs()
-          .update(getProjectId(), jobId, content)
-          .execute();
+      dataflowClient.updateJob(jobId, content);
       return State.CANCELLED;
     } catch (IOException e) {
       State state = getState();
@@ -401,11 +405,7 @@ public class DataflowPipelineJob implements PipelineResult {
     // Retry loop ends in return or throw
     while (true) {
       try {
-        Job job = dataflowOptions.getDataflowClient()
-            .projects()
-            .jobs()
-            .get(getProjectId(), jobId)
-            .execute();
+        Job job = dataflowClient.getJob(jobId);
         State currentState = MonitoringUtil.toState(job.getCurrentState());
         if (currentState.isTerminal()) {
           terminalState = currentState;
@@ -476,8 +476,7 @@ public class DataflowPipelineJob implements PipelineResult {
         metricUpdates = terminalMetricUpdates;
       } else {
         boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
-            .projects().jobs().getMetrics(getProjectId(), jobId).execute();
+        JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId);
         metricUpdates = jobMetrics.getMetrics();
         if (terminal && jobMetrics.getMetrics() != null) {
           terminalMetricUpdates = metricUpdates;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e781b4e..40d8948 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -32,7 +32,6 @@ import com.google.api.services.clouddebugger.v2.Clouddebugger;
 import com.google.api.services.clouddebugger.v2.model.Debuggee;
 import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
 import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
@@ -194,7 +193,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private final DataflowPipelineOptions options;
 
   /** Client for the Dataflow service. This is used to actually submit jobs. */
-  private final Dataflow dataflowClient;
+  private final DataflowClient dataflowClient;
 
   /** Translator for this DataflowRunner, based on options. */
   private final DataflowPipelineTranslator translator;
@@ -321,7 +320,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   @VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) {
     this.options = options;
-    this.dataflowClient = options.getDataflowClient();
+    this.dataflowClient = DataflowClient.create(options);
     this.translator = DataflowPipelineTranslator.fromOptions(options);
     this.pcollectionsRequiringIndexedFormat = new HashSet<>();
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
@@ -597,11 +596,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
     Job jobResult;
     try {
-      jobResult = dataflowClient
-              .projects()
-              .jobs()
-              .create(options.getProject(), newJob)
-              .execute();
+      jobResult = dataflowClient.createJob(newJob);
     } catch (GoogleJsonResponseException e) {
       String errorMessages = "Unexpected errors";
       if (e.getDetails() != null) {
@@ -2830,10 +2825,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       ListJobsResponse listResult;
       String token = null;
       do {
-        listResult = dataflowClient.projects().jobs()
-            .list(options.getProject())
-            .setPageToken(token)
-            .execute();
+        listResult = dataflowClient.listJobs(token);
         token = listResult.getNextPageToken();
         for (Job job : listResult.getJobs()) {
           if (job.getName().equals(jobName)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 70c3f58..4b0fcf2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -65,11 +66,13 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
 
   private final TestDataflowPipelineOptions options;
+  private final DataflowClient dataflowClient;
   private final DataflowRunner runner;
   private int expectedNumberOfAssertions = 0;
 
   TestDataflowRunner(TestDataflowPipelineOptions options) {
     this.options = options;
+    this.dataflowClient = DataflowClient.create(options);
     this.runner = DataflowRunner.fromOptions(options);
   }
 
@@ -279,8 +282,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   JobMetrics getJobMetrics(DataflowPipelineJob job) {
     JobMetrics metrics = null;
     try {
-      metrics = options.getDataflowClient().projects().jobs()
-          .getMetrics(job.getProjectId(), job.getJobId()).execute();
+      metrics = dataflowClient.getJobMetrics(job.getJobId());
     } catch (IOException e) {
       LOG.warn("Failed to get job metrics: ", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index efb6d2b..d0a24bf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util;
 import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
 
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 import com.google.common.base.MoreObjects;
@@ -35,6 +34,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.joda.time.Instant;
@@ -67,8 +67,7 @@ public final class MonitoringUtil {
   private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
   private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
 
-  private String projectId;
-  private Messages messagesClient;
+  private final DataflowClient dataflowClient;
 
   /**
    * An interface that can be used for defining callbacks to receive a list
@@ -115,14 +114,8 @@ public final class MonitoringUtil {
   }
 
   /** Construct a helper for monitoring. */
-  public MonitoringUtil(String projectId, Dataflow dataflow) {
-    this(projectId, dataflow.projects().jobs().messages());
-  }
-
-  // @VisibleForTesting
-  MonitoringUtil(String projectId, Messages messagesClient) {
-    this.projectId = projectId;
-    this.messagesClient = messagesClient;
+  public MonitoringUtil(DataflowClient dataflowClient) {
+    this.dataflowClient = dataflowClient;
   }
 
   /**
@@ -157,12 +150,7 @@ public final class MonitoringUtil {
     ArrayList<JobMessage> allMessages = new ArrayList<>();
     String pageToken = null;
     while (true) {
-      Messages.List listRequest = messagesClient.list(projectId, jobId);
-      if (pageToken != null) {
-        listRequest.setPageToken(pageToken);
-      }
-      ListJobMessagesResponse response = listRequest.execute();
-
+      ListJobMessagesResponse response = dataflowClient.listJobMessages(jobId, pageToken);
       if (response == null || response.getJobMessages() == null) {
         return allMessages;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 323f762..1890da1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -157,6 +157,7 @@ public class DataflowPipelineJobTest {
     Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class);
     when(mockJobs.messages()).thenReturn(mockMessages);
     when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest);
+    when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
     when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index bb84d98..67408ae 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
+import com.google.api.services.dataflow.Dataflow;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -38,11 +39,14 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
@@ -50,6 +54,14 @@ public class DataflowGroupByKeyTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  @Mock
+  private Dataflow dataflow;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   /**
    * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
    * is not expanded. This is used for verifying that even without expansion the proper errors show
@@ -61,7 +73,7 @@ public class DataflowGroupByKeyTest {
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index ed3f2cd..b9220af 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
+import com.google.api.services.dataflow.Dataflow;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
@@ -36,12 +37,15 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link View} for a {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
@@ -49,13 +53,21 @@ public class DataflowViewTest {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
+  @Mock
+  private Dataflow dataflow;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   private Pipeline createTestBatchRunner() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 
@@ -66,7 +78,7 @@ public class DataflowViewTest {
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 6c5a2be..23ed26f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -19,16 +19,15 @@ package org.apache.beam.runners.dataflow.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -57,15 +56,7 @@ public class MonitoringUtilTest {
 
   @Test
   public void testGetJobMessages() throws IOException {
-    Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
-
-    // Two requests are needed to get all the messages.
-    Dataflow.Projects.Jobs.Messages.List firstRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-    Dataflow.Projects.Jobs.Messages.List secondRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-
-    when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+    DataflowClient dataflowClient = mock(DataflowClient.class);
 
     ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
     firstResponse.setJobMessages(new ArrayList<JobMessage>());
@@ -87,15 +78,13 @@ public class MonitoringUtilTest {
       secondResponse.getJobMessages().add(message);
     }
 
-    when(firstRequest.execute()).thenReturn(firstResponse);
-    when(secondRequest.execute()).thenReturn(secondResponse);
+    when(dataflowClient.listJobMessages(JOB_ID, null)).thenReturn(firstResponse);
+    when(dataflowClient.listJobMessages(JOB_ID, pageToken)).thenReturn(secondResponse);
 
-    MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+    MonitoringUtil util = new MonitoringUtil(dataflowClient);
 
     List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
 
-    verify(secondRequest).setPageToken(pageToken);
-
     assertEquals(150, messages.size());
   }
 


[3/4] incubator-beam git commit: [Code Health] Remove redundant projectId from DataflowPipelineJob.

Posted by bc...@apache.org.
[Code Health] Remove redundant projectId from DataflowPipelineJob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ded58832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ded58832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ded58832

Branch: refs/heads/master
Commit: ded58832ceaef487f4590d9396f09744288c955d
Parents: afedd68
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 16:14:27 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 22 +++------
 .../beam/runners/dataflow/DataflowRunner.java   |  4 +-
 .../dataflow/util/DataflowTemplateJob.java      |  2 +-
 .../dataflow/DataflowPipelineJobTest.java       | 48 ++++++++++----------
 .../testing/TestDataflowRunnerTest.java         | 36 +++++----------
 5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index a2b632f..58e85e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,11 +62,6 @@ public class DataflowPipelineJob implements PipelineResult {
   private String jobId;
 
   /**
-   * Google cloud project to associate this pipeline with.
-   */
-  private String projectId;
-
-  /**
    * Client for the Dataflow service. This can be used to query the service
    * for information about the job.
    */
@@ -119,17 +114,14 @@ public class DataflowPipelineJob implements PipelineResult {
   /**
    * Constructs the job.
    *
-   * @param projectId the project id
    * @param jobId the job id
    * @param dataflowOptions used to configure the client for the Dataflow Service
    * @param aggregatorTransforms a mapping from aggregators to PTransforms
    */
   public DataflowPipelineJob(
-      String projectId,
       String jobId,
       DataflowPipelineOptions dataflowOptions,
       DataflowAggregatorTransforms aggregatorTransforms) {
-    this.projectId = projectId;
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
     this.aggregatorTransforms = aggregatorTransforms;
@@ -146,7 +138,7 @@ public class DataflowPipelineJob implements PipelineResult {
    * Get the project this job exists in.
    */
   public String getProjectId() {
-    return projectId;
+    return dataflowOptions.getProject();
   }
 
   /**
@@ -249,7 +241,7 @@ public class DataflowPipelineJob implements PipelineResult {
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock) throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
+    MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
 
     long lastTimestamp = 0;
     BackOff backoff;
@@ -338,12 +330,12 @@ public class DataflowPipelineJob implements PipelineResult {
   @Override
   public State cancel() throws IOException {
     Job content = new Job();
-    content.setProjectId(projectId);
+    content.setProjectId(getProjectId());
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
     try {
       dataflowOptions.getDataflowClient().projects().jobs()
-          .update(projectId, jobId, content)
+          .update(getProjectId(), jobId, content)
           .execute();
       return State.CANCELLED;
     } catch (IOException e) {
@@ -412,13 +404,13 @@ public class DataflowPipelineJob implements PipelineResult {
         Job job = dataflowOptions.getDataflowClient()
             .projects()
             .jobs()
-            .get(projectId, jobId)
+            .get(getProjectId(), jobId)
             .execute();
         State currentState = MonitoringUtil.toState(job.getCurrentState());
         if (currentState.isTerminal()) {
           terminalState = currentState;
           replacedByJob = new DataflowPipelineJob(
-              getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+              job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
         }
         return job;
       } catch (IOException exn) {
@@ -485,7 +477,7 @@ public class DataflowPipelineJob implements PipelineResult {
       } else {
         boolean terminal = getState().isTerminal();
         JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
-            .projects().jobs().getMetrics(projectId, jobId).execute();
+            .projects().jobs().getMetrics(getProjectId(), jobId).execute();
         metricUpdates = jobMetrics.getMetrics();
         if (terminal && jobMetrics.getMetrics() != null) {
           terminalMetricUpdates = metricUpdates;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 339771b..e781b4e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -629,8 +629,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Use a raw client for post-launch monitoring, as status calls may fail
     // regularly and need not be retried automatically.
-    DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
-        options.getProject(), jobResult.getId(), options, aggregatorTransforms);
+    DataflowPipelineJob dataflowPipelineJob =
+        new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 2937184..1a44963 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
       "The result of template creation should not be used.";
 
   public DataflowTemplateJob() {
-    super(null, null, null, null);
+    super(null, null, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 0527b7c..323f762 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -115,6 +115,7 @@ public class DataflowPipelineJobTest {
 
     options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
     options.setDataflowClient(mockWorkflowClient);
+    options.setProject(PROJECT_ID);
   }
 
   /**
@@ -160,8 +161,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     State state = job.waitUntilFinish(
         Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -182,8 +183,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
   }
@@ -249,8 +250,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -269,8 +270,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
     assertEquals(null, state);
@@ -294,7 +295,7 @@ public class DataflowPipelineJobTest {
     FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+        JOB_ID, options, dataflowAggregatorTransforms);
     long startTime = clock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
     assertEquals(null, state);
@@ -317,7 +318,7 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+        JOB_ID, options, dataflowAggregatorTransforms);
 
     assertEquals(
         State.RUNNING,
@@ -333,8 +334,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
     assertEquals(
@@ -373,7 +374,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -408,7 +409,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -453,8 +454,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -521,8 +521,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -571,7 +570,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -589,7 +588,7 @@ public class DataflowPipelineJobTest {
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("not used in this pipeline");
@@ -624,8 +623,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(AggregatorRetrievalException.class);
     thrown.expectCause(is(cause));
@@ -690,7 +688,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenReturn(new Job());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     assertEquals(State.CANCELLED, job.cancel());
     Job content = new Job();
@@ -714,7 +712,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     thrown.expect(IOException.class);
     thrown.expectMessage("Failed to cancel the job, "
@@ -742,7 +740,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     assertEquals(State.FAILED, job.cancel());
     Job content = new Job();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index e6b513a..366c6a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -344,8 +344,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -359,8 +358,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -374,8 +372,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -389,8 +386,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -403,8 +399,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -417,8 +412,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -431,8 +425,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -446,8 +439,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -461,8 +453,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -476,8 +467,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -532,8 +522,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -549,8 +538,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatFailsForException() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 


[2/4] incubator-beam git commit: [BEAM-1047] Add DataflowClient wrapper on top of JSON library.

Posted by bc...@apache.org.
[BEAM-1047] Add DataflowClient wrapper on top of JSON library.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8c9686a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8c9686a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8c9686a

Branch: refs/heads/master
Commit: e8c9686a2e898d38afd692328eb171c542084747
Parents: ded5883
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 15:59:56 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowClient.java   | 130 +++++++++++++++++++
 1 file changed, 130 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8c9686a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
new file mode 100644
index 0000000..f2081db
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
+import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
+import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+
+/**
+ * Client library for {@link Dataflow}.
+ */
+public class DataflowClient {
+
+  public static DataflowClient create(DataflowPipelineOptions options) {
+    return new DataflowClient(options.getDataflowClient(), options);
+  }
+
+  private final Dataflow dataflow;
+  private final DataflowPipelineOptions options;
+
+  private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
+    this.dataflow = checkNotNull(dataflow, "dataflow");
+    this.options = checkNotNull(options, "options");
+  }
+
+  /**
+   * Creates the Dataflow {@link Job}.
+   */
+  public Job createJob(@Nonnull Job job) throws IOException {
+    Jobs.Create jobsCreate = dataflow.projects().jobs().create(options.getProject(), job);
+    return jobsCreate.execute();
+  }
+
+  /**
+   * Lists Dataflow {@link Job Jobs} in the project associated with
+   * the {@link DataflowPipelineOptions}.
+   */
+  public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException {
+    Jobs.List jobsList = dataflow.projects().jobs()
+        .list(options.getProject())
+        .setPageToken(pageToken);
+    return jobsList.execute();
+  }
+
+  /**
+   * Updates the Dataflow {@link Job} with the given {@code jobId}.
+   */
+  public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException {
+    Jobs.Update jobsUpdate = dataflow.projects().jobs()
+        .update(options.getProject(), jobId, content);
+    return jobsUpdate.execute();
+  }
+
+  /**
+   * Gets the Dataflow {@link Job} with the given {@code jobId}.
+   */
+  public Job getJob(@Nonnull String jobId) throws IOException {
+    Jobs.Get jobsGet = dataflow.projects().jobs()
+        .get(options.getProject(), jobId);
+    return jobsGet.execute();
+  }
+
+  /**
+   * Gets the {@link JobMetrics} with the given {@code jobId}.
+   */
+  public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
+    Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
+        .getMetrics(options.getProject(), jobId);
+    return jobsGetMetrics.execute();
+  }
+
+  /**
+   * Lists job messages with the given {@code jobId}.
+   */
+  public ListJobMessagesResponse listJobMessages(
+      @Nonnull String jobId, @Nullable String pageToken) throws IOException {
+    Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
+        .list(options.getProject(), jobId)
+        .setPageToken(pageToken);
+    return jobMessagesList.execute();
+  }
+
+  /**
+   * Leases the work item for {@code jobId}.
+   */
+  public LeaseWorkItemResponse leaseWorkItem(
+      @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException {
+    Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems()
+        .lease(options.getProject(), jobId, request);
+    return jobWorkItemsLease.execute();
+  }
+
+  /**
+   * Reports the status of the work item for {@code jobId}.
+   */
+  public ReportWorkItemStatusResponse reportWorkItemStatus(
+      @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException {
+    Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems()
+        .reportStatus(options.getProject(), jobId, request);
+    return jobWorkItemsReportStatus.execute();
+  }
+}


[4/4] incubator-beam git commit: Closes #1434

Posted by bc...@apache.org.
Closes #1434


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2b570f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2b570f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2b570f2

Branch: refs/heads/master
Commit: b2b570f27808b1671bf6cd0fc60f874da671d4ca
Parents: afedd68 ce03f30
Author: bchambers <bc...@google.com>
Authored: Tue Dec 6 17:08:13 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:13 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowClient.java   | 140 +++++++++++++++++++
 .../runners/dataflow/DataflowPipelineJob.java   |  31 ++--
 .../beam/runners/dataflow/DataflowRunner.java   |  20 +--
 .../dataflow/testing/TestDataflowRunner.java    |   6 +-
 .../dataflow/util/DataflowTemplateJob.java      |   2 +-
 .../runners/dataflow/util/MonitoringUtil.java   |  22 +--
 .../dataflow/DataflowPipelineJobTest.java       |  49 ++++---
 .../testing/TestDataflowRunnerTest.java         |  36 ++---
 .../transforms/DataflowGroupByKeyTest.java      |  14 +-
 .../dataflow/transforms/DataflowViewTest.java   |  16 ++-
 .../dataflow/util/MonitoringUtilTest.java       |  21 +--
 11 files changed, 235 insertions(+), 122 deletions(-)
----------------------------------------------------------------------