You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/12/07 01:15:43 UTC
[1/4] incubator-beam git commit: [BEAM-1047] Update dataflow runner
code to use DataflowClient wrapper.
Repository: incubator-beam
Updated Branches:
refs/heads/master afedd68e8 -> b2b570f27
[BEAM-1047] Update dataflow runner code to use DataflowClient wrapper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce03f30c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce03f30c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce03f30c
Branch: refs/heads/master
Commit: ce03f30c1ee0b84ad2e7f10a6272ffb25548244a
Parents: e8c9686
Author: Pei He <pe...@google.com>
Authored: Mon Nov 28 11:47:42 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowClient.java | 36 +++++++++++++-------
.../runners/dataflow/DataflowPipelineJob.java | 23 ++++++-------
.../beam/runners/dataflow/DataflowRunner.java | 16 +++------
.../dataflow/testing/TestDataflowRunner.java | 6 ++--
.../runners/dataflow/util/MonitoringUtil.java | 22 +++---------
.../dataflow/DataflowPipelineJobTest.java | 1 +
.../transforms/DataflowGroupByKeyTest.java | 14 +++++++-
.../dataflow/transforms/DataflowViewTest.java | 16 +++++++--
.../dataflow/util/MonitoringUtilTest.java | 21 +++---------
9 files changed, 80 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
index f2081db..3536d72 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -35,27 +35,28 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
/**
- * Client library for {@link Dataflow}.
+ * Wrapper around the generated {@link Dataflow} client to provide common functionality.
*/
public class DataflowClient {
public static DataflowClient create(DataflowPipelineOptions options) {
- return new DataflowClient(options.getDataflowClient(), options);
+ return new DataflowClient(options.getDataflowClient(), options.getProject());
}
private final Dataflow dataflow;
- private final DataflowPipelineOptions options;
+ private final String projectId;
- private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
+ private DataflowClient(Dataflow dataflow, String projectId) {
this.dataflow = checkNotNull(dataflow, "dataflow");
- this.options = checkNotNull(options, "options");
+ this.projectId = checkNotNull(projectId, "options");
}
/**
* Creates the Dataflow {@link Job}.
*/
public Job createJob(@Nonnull Job job) throws IOException {
- Jobs.Create jobsCreate = dataflow.projects().jobs().create(options.getProject(), job);
+ checkNotNull(job, "job");
+ Jobs.Create jobsCreate = dataflow.projects().jobs().create(projectId, job);
return jobsCreate.execute();
}
@@ -65,7 +66,7 @@ public class DataflowClient {
*/
public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException {
Jobs.List jobsList = dataflow.projects().jobs()
- .list(options.getProject())
+ .list(projectId)
.setPageToken(pageToken);
return jobsList.execute();
}
@@ -74,8 +75,10 @@ public class DataflowClient {
* Updates the Dataflow {@link Job} with the given {@code jobId}.
*/
public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException {
+ checkNotNull(jobId, "jobId");
+ checkNotNull(content, "content");
Jobs.Update jobsUpdate = dataflow.projects().jobs()
- .update(options.getProject(), jobId, content);
+ .update(projectId, jobId, content);
return jobsUpdate.execute();
}
@@ -83,8 +86,9 @@ public class DataflowClient {
* Gets the Dataflow {@link Job} with the given {@code jobId}.
*/
public Job getJob(@Nonnull String jobId) throws IOException {
+ checkNotNull(jobId, "jobId");
Jobs.Get jobsGet = dataflow.projects().jobs()
- .get(options.getProject(), jobId);
+ .get(projectId, jobId);
return jobsGet.execute();
}
@@ -92,8 +96,9 @@ public class DataflowClient {
* Gets the {@link JobMetrics} with the given {@code jobId}.
*/
public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
+ checkNotNull(jobId, "jobId");
Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
- .getMetrics(options.getProject(), jobId);
+ .getMetrics(projectId, jobId);
return jobsGetMetrics.execute();
}
@@ -102,8 +107,9 @@ public class DataflowClient {
*/
public ListJobMessagesResponse listJobMessages(
@Nonnull String jobId, @Nullable String pageToken) throws IOException {
+ checkNotNull(jobId, "jobId");
Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
- .list(options.getProject(), jobId)
+ .list(projectId, jobId)
.setPageToken(pageToken);
return jobMessagesList.execute();
}
@@ -113,8 +119,10 @@ public class DataflowClient {
*/
public LeaseWorkItemResponse leaseWorkItem(
@Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException {
+ checkNotNull(jobId, "jobId");
+ checkNotNull(request, "request");
Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems()
- .lease(options.getProject(), jobId, request);
+ .lease(projectId, jobId, request);
return jobWorkItemsLease.execute();
}
@@ -123,8 +131,10 @@ public class DataflowClient {
*/
public ReportWorkItemStatusResponse reportWorkItemStatus(
@Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException {
+ checkNotNull(jobId, "jobId");
+ checkNotNull(request, "request");
Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems()
- .reportStatus(options.getProject(), jobId, request);
+ .reportStatus(projectId, jobId, request);
return jobWorkItemsReportStatus.execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 58e85e0..00c88f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,10 +62,15 @@ public class DataflowPipelineJob implements PipelineResult {
private String jobId;
/**
+ * The {@link DataflowPipelineOptions} for the job.
+ */
+ private final DataflowPipelineOptions dataflowOptions;
+
+ /**
* Client for the Dataflow service. This can be used to query the service
* for information about the job.
*/
- private DataflowPipelineOptions dataflowOptions;
+ private final DataflowClient dataflowClient;
/**
* The state the job terminated in or {@code null} if the job has not terminated.
@@ -124,6 +129,7 @@ public class DataflowPipelineJob implements PipelineResult {
DataflowAggregatorTransforms aggregatorTransforms) {
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
+ this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
this.aggregatorTransforms = aggregatorTransforms;
}
@@ -241,7 +247,7 @@ public class DataflowPipelineJob implements PipelineResult {
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock) throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
+ MonitoringUtil monitor = new MonitoringUtil(dataflowClient);
long lastTimestamp = 0;
BackOff backoff;
@@ -334,9 +340,7 @@ public class DataflowPipelineJob implements PipelineResult {
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
- dataflowOptions.getDataflowClient().projects().jobs()
- .update(getProjectId(), jobId, content)
- .execute();
+ dataflowClient.updateJob(jobId, content);
return State.CANCELLED;
} catch (IOException e) {
State state = getState();
@@ -401,11 +405,7 @@ public class DataflowPipelineJob implements PipelineResult {
// Retry loop ends in return or throw
while (true) {
try {
- Job job = dataflowOptions.getDataflowClient()
- .projects()
- .jobs()
- .get(getProjectId(), jobId)
- .execute();
+ Job job = dataflowClient.getJob(jobId);
State currentState = MonitoringUtil.toState(job.getCurrentState());
if (currentState.isTerminal()) {
terminalState = currentState;
@@ -476,8 +476,7 @@ public class DataflowPipelineJob implements PipelineResult {
metricUpdates = terminalMetricUpdates;
} else {
boolean terminal = getState().isTerminal();
- JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
- .projects().jobs().getMetrics(getProjectId(), jobId).execute();
+ JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId);
metricUpdates = jobMetrics.getMetrics();
if (terminal && jobMetrics.getMetrics() != null) {
terminalMetricUpdates = metricUpdates;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e781b4e..40d8948 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -32,7 +32,6 @@ import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
-import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
@@ -194,7 +193,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private final DataflowPipelineOptions options;
/** Client for the Dataflow service. This is used to actually submit jobs. */
- private final Dataflow dataflowClient;
+ private final DataflowClient dataflowClient;
/** Translator for this DataflowRunner, based on options. */
private final DataflowPipelineTranslator translator;
@@ -321,7 +320,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) {
this.options = options;
- this.dataflowClient = options.getDataflowClient();
+ this.dataflowClient = DataflowClient.create(options);
this.translator = DataflowPipelineTranslator.fromOptions(options);
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
@@ -597,11 +596,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
Job jobResult;
try {
- jobResult = dataflowClient
- .projects()
- .jobs()
- .create(options.getProject(), newJob)
- .execute();
+ jobResult = dataflowClient.createJob(newJob);
} catch (GoogleJsonResponseException e) {
String errorMessages = "Unexpected errors";
if (e.getDetails() != null) {
@@ -2830,10 +2825,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
ListJobsResponse listResult;
String token = null;
do {
- listResult = dataflowClient.projects().jobs()
- .list(options.getProject())
- .setPageToken(token)
- .execute();
+ listResult = dataflowClient.listJobs(token);
token = listResult.getNextPageToken();
for (Job job : listResult.getJobs()) {
if (job.getName().equals(jobName)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 70c3f58..4b0fcf2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -65,11 +66,13 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
private final TestDataflowPipelineOptions options;
+ private final DataflowClient dataflowClient;
private final DataflowRunner runner;
private int expectedNumberOfAssertions = 0;
TestDataflowRunner(TestDataflowPipelineOptions options) {
this.options = options;
+ this.dataflowClient = DataflowClient.create(options);
this.runner = DataflowRunner.fromOptions(options);
}
@@ -279,8 +282,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
JobMetrics getJobMetrics(DataflowPipelineJob job) {
JobMetrics metrics = null;
try {
- metrics = options.getDataflowClient().projects().jobs()
- .getMetrics(job.getProjectId(), job.getJobId()).execute();
+ metrics = dataflowClient.getJobMetrics(job.getJobId());
} catch (IOException e) {
LOG.warn("Failed to get job metrics: ", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index efb6d2b..d0a24bf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util;
import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import com.google.common.base.MoreObjects;
@@ -35,6 +34,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
import org.joda.time.Instant;
@@ -67,8 +67,7 @@ public final class MonitoringUtil {
private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
- private String projectId;
- private Messages messagesClient;
+ private final DataflowClient dataflowClient;
/**
* An interface that can be used for defining callbacks to receive a list
@@ -115,14 +114,8 @@ public final class MonitoringUtil {
}
/** Construct a helper for monitoring. */
- public MonitoringUtil(String projectId, Dataflow dataflow) {
- this(projectId, dataflow.projects().jobs().messages());
- }
-
- // @VisibleForTesting
- MonitoringUtil(String projectId, Messages messagesClient) {
- this.projectId = projectId;
- this.messagesClient = messagesClient;
+ public MonitoringUtil(DataflowClient dataflowClient) {
+ this.dataflowClient = dataflowClient;
}
/**
@@ -157,12 +150,7 @@ public final class MonitoringUtil {
ArrayList<JobMessage> allMessages = new ArrayList<>();
String pageToken = null;
while (true) {
- Messages.List listRequest = messagesClient.list(projectId, jobId);
- if (pageToken != null) {
- listRequest.setPageToken(pageToken);
- }
- ListJobMessagesResponse response = listRequest.execute();
-
+ ListJobMessagesResponse response = dataflowClient.listJobMessages(jobId, pageToken);
if (response == null || response.getJobMessages() == null) {
return allMessages;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 323f762..1890da1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -157,6 +157,7 @@ public class DataflowPipelineJobTest {
Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class);
when(mockJobs.messages()).thenReturn(mockMessages);
when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest);
+ when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index bb84d98..67408ae 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.transforms;
+import com.google.api.services.dataflow.Dataflow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -38,11 +39,14 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
/** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */
@RunWith(JUnit4.class)
@@ -50,6 +54,14 @@ public class DataflowGroupByKeyTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
+ @Mock
+ private Dataflow dataflow;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
/**
* Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
* is not expanded. This is used for verifying that even without expansion the proper errors show
@@ -61,7 +73,7 @@ public class DataflowGroupByKeyTest {
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
+ options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index ed3f2cd..b9220af 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.transforms;
+import com.google.api.services.dataflow.Dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
@@ -36,12 +37,15 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
/** Tests for {@link View} for a {@link DataflowRunner}. */
@RunWith(JUnit4.class)
@@ -49,13 +53,21 @@ public class DataflowViewTest {
@Rule
public transient ExpectedException thrown = ExpectedException.none();
+ @Mock
+ private Dataflow dataflow;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
private Pipeline createTestBatchRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
+ options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
@@ -66,7 +78,7 @@ public class DataflowViewTest {
options.setProject("someproject");
options.setGcpTempLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
+ options.setDataflowClient(dataflow);
return Pipeline.create(options);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 6c5a2be..23ed26f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -19,16 +19,15 @@ package org.apache.beam.runners.dataflow.util;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
import org.apache.beam.sdk.PipelineResult.State;
@@ -57,15 +56,7 @@ public class MonitoringUtilTest {
@Test
public void testGetJobMessages() throws IOException {
- Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
-
- // Two requests are needed to get all the messages.
- Dataflow.Projects.Jobs.Messages.List firstRequest =
- mock(Dataflow.Projects.Jobs.Messages.List.class);
- Dataflow.Projects.Jobs.Messages.List secondRequest =
- mock(Dataflow.Projects.Jobs.Messages.List.class);
-
- when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+ DataflowClient dataflowClient = mock(DataflowClient.class);
ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
firstResponse.setJobMessages(new ArrayList<JobMessage>());
@@ -87,15 +78,13 @@ public class MonitoringUtilTest {
secondResponse.getJobMessages().add(message);
}
- when(firstRequest.execute()).thenReturn(firstResponse);
- when(secondRequest.execute()).thenReturn(secondResponse);
+ when(dataflowClient.listJobMessages(JOB_ID, null)).thenReturn(firstResponse);
+ when(dataflowClient.listJobMessages(JOB_ID, pageToken)).thenReturn(secondResponse);
- MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+ MonitoringUtil util = new MonitoringUtil(dataflowClient);
List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
- verify(secondRequest).setPageToken(pageToken);
-
assertEquals(150, messages.size());
}
[3/4] incubator-beam git commit: [Code Health] Remove redundant
projectId from DataflowPipelineJob.
Posted by bc...@apache.org.
[Code Health] Remove redundant projectId from DataflowPipelineJob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ded58832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ded58832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ded58832
Branch: refs/heads/master
Commit: ded58832ceaef487f4590d9396f09744288c955d
Parents: afedd68
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 16:14:27 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 22 +++------
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../dataflow/util/DataflowTemplateJob.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 48 ++++++++++----------
.../testing/TestDataflowRunnerTest.java | 36 +++++----------
5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index a2b632f..58e85e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,11 +62,6 @@ public class DataflowPipelineJob implements PipelineResult {
private String jobId;
/**
- * Google cloud project to associate this pipeline with.
- */
- private String projectId;
-
- /**
* Client for the Dataflow service. This can be used to query the service
* for information about the job.
*/
@@ -119,17 +114,14 @@ public class DataflowPipelineJob implements PipelineResult {
/**
* Constructs the job.
*
- * @param projectId the project id
* @param jobId the job id
* @param dataflowOptions used to configure the client for the Dataflow Service
* @param aggregatorTransforms a mapping from aggregators to PTransforms
*/
public DataflowPipelineJob(
- String projectId,
String jobId,
DataflowPipelineOptions dataflowOptions,
DataflowAggregatorTransforms aggregatorTransforms) {
- this.projectId = projectId;
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
this.aggregatorTransforms = aggregatorTransforms;
@@ -146,7 +138,7 @@ public class DataflowPipelineJob implements PipelineResult {
* Get the project this job exists in.
*/
public String getProjectId() {
- return projectId;
+ return dataflowOptions.getProject();
}
/**
@@ -249,7 +241,7 @@ public class DataflowPipelineJob implements PipelineResult {
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock) throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
+ MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
long lastTimestamp = 0;
BackOff backoff;
@@ -338,12 +330,12 @@ public class DataflowPipelineJob implements PipelineResult {
@Override
public State cancel() throws IOException {
Job content = new Job();
- content.setProjectId(projectId);
+ content.setProjectId(getProjectId());
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
dataflowOptions.getDataflowClient().projects().jobs()
- .update(projectId, jobId, content)
+ .update(getProjectId(), jobId, content)
.execute();
return State.CANCELLED;
} catch (IOException e) {
@@ -412,13 +404,13 @@ public class DataflowPipelineJob implements PipelineResult {
Job job = dataflowOptions.getDataflowClient()
.projects()
.jobs()
- .get(projectId, jobId)
+ .get(getProjectId(), jobId)
.execute();
State currentState = MonitoringUtil.toState(job.getCurrentState());
if (currentState.isTerminal()) {
terminalState = currentState;
replacedByJob = new DataflowPipelineJob(
- getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+ job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
}
return job;
} catch (IOException exn) {
@@ -485,7 +477,7 @@ public class DataflowPipelineJob implements PipelineResult {
} else {
boolean terminal = getState().isTerminal();
JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
- .projects().jobs().getMetrics(projectId, jobId).execute();
+ .projects().jobs().getMetrics(getProjectId(), jobId).execute();
metricUpdates = jobMetrics.getMetrics();
if (terminal && jobMetrics.getMetrics() != null) {
terminalMetricUpdates = metricUpdates;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 339771b..e781b4e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -629,8 +629,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
- DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
- options.getProject(), jobResult.getId(), options, aggregatorTransforms);
+ DataflowPipelineJob dataflowPipelineJob =
+ new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 2937184..1a44963 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
"The result of template creation should not be used.";
public DataflowTemplateJob() {
- super(null, null, null, null);
+ super(null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 0527b7c..323f762 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -115,6 +115,7 @@ public class DataflowPipelineJobTest {
options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
options.setDataflowClient(mockWorkflowClient);
+ options.setProject(PROJECT_ID);
}
/**
@@ -160,8 +161,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
State state = job.waitUntilFinish(
Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -182,8 +183,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
}
@@ -249,8 +250,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -269,8 +270,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
assertEquals(null, state);
@@ -294,7 +295,7 @@ public class DataflowPipelineJobTest {
FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ JOB_ID, options, dataflowAggregatorTransforms);
long startTime = clock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
assertEquals(null, state);
@@ -317,7 +318,7 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ JOB_ID, options, dataflowAggregatorTransforms);
assertEquals(
State.RUNNING,
@@ -333,8 +334,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
assertEquals(
@@ -373,7 +374,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -408,7 +409,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -453,8 +454,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -521,8 +521,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -571,7 +570,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -589,7 +588,7 @@ public class DataflowPipelineJobTest {
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("not used in this pipeline");
@@ -624,8 +623,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
thrown.expect(AggregatorRetrievalException.class);
thrown.expectCause(is(cause));
@@ -690,7 +688,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenReturn(new Job());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
assertEquals(State.CANCELLED, job.cancel());
Job content = new Job();
@@ -714,7 +712,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenThrow(new IOException());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
thrown.expect(IOException.class);
thrown.expectMessage("Failed to cancel the job, "
@@ -742,7 +740,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenThrow(new IOException());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
assertEquals(State.FAILED, job.cancel());
Job content = new Job();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index e6b513a..366c6a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -344,8 +344,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -359,8 +358,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -374,8 +372,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -389,8 +386,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -403,8 +399,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -417,8 +412,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -431,8 +425,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -446,8 +439,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -461,8 +453,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -476,8 +467,7 @@ public class TestDataflowRunnerTest {
@Test
public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -532,8 +522,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -549,8 +538,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatFailsForException() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
[2/4] incubator-beam git commit: [BEAM-1047] Add DataflowClient
wrapper on top of JSON library.
Posted by bc...@apache.org.
[BEAM-1047] Add DataflowClient wrapper on top of JSON library.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8c9686a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8c9686a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8c9686a
Branch: refs/heads/master
Commit: e8c9686a2e898d38afd692328eb171c542084747
Parents: ded5883
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 15:59:56 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowClient.java | 130 +++++++++++++++++++
1 file changed, 130 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8c9686a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
new file mode 100644
index 0000000..f2081db
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
+import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
+import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+
+/**
+ * Client library for {@link Dataflow}.
+ */
+public class DataflowClient {
+
+ public static DataflowClient create(DataflowPipelineOptions options) {
+ return new DataflowClient(options.getDataflowClient(), options);
+ }
+
+ private final Dataflow dataflow;
+ private final DataflowPipelineOptions options;
+
+ private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
+ this.dataflow = checkNotNull(dataflow, "dataflow");
+ this.options = checkNotNull(options, "options");
+ }
+
+ /**
+ * Creates the Dataflow {@link Job}.
+ */
+ public Job createJob(@Nonnull Job job) throws IOException {
+ Jobs.Create jobsCreate = dataflow.projects().jobs().create(options.getProject(), job);
+ return jobsCreate.execute();
+ }
+
+ /**
+ * Lists Dataflow {@link Job Jobs} in the project associated with
+ * the {@link DataflowPipelineOptions}.
+ */
+ public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException {
+ Jobs.List jobsList = dataflow.projects().jobs()
+ .list(options.getProject())
+ .setPageToken(pageToken);
+ return jobsList.execute();
+ }
+
+ /**
+ * Updates the Dataflow {@link Job} with the given {@code jobId}.
+ */
+ public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException {
+ Jobs.Update jobsUpdate = dataflow.projects().jobs()
+ .update(options.getProject(), jobId, content);
+ return jobsUpdate.execute();
+ }
+
+ /**
+ * Gets the Dataflow {@link Job} with the given {@code jobId}.
+ */
+ public Job getJob(@Nonnull String jobId) throws IOException {
+ Jobs.Get jobsGet = dataflow.projects().jobs()
+ .get(options.getProject(), jobId);
+ return jobsGet.execute();
+ }
+
+ /**
+ * Gets the {@link JobMetrics} with the given {@code jobId}.
+ */
+ public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
+ Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
+ .getMetrics(options.getProject(), jobId);
+ return jobsGetMetrics.execute();
+ }
+
+ /**
+ * Lists job messages with the given {@code jobId}.
+ */
+ public ListJobMessagesResponse listJobMessages(
+ @Nonnull String jobId, @Nullable String pageToken) throws IOException {
+ Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
+ .list(options.getProject(), jobId)
+ .setPageToken(pageToken);
+ return jobMessagesList.execute();
+ }
+
+ /**
+ * Leases the work item for {@code jobId}.
+ */
+ public LeaseWorkItemResponse leaseWorkItem(
+ @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException {
+ Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems()
+ .lease(options.getProject(), jobId, request);
+ return jobWorkItemsLease.execute();
+ }
+
+ /**
+ * Reports the status of the work item for {@code jobId}.
+ */
+ public ReportWorkItemStatusResponse reportWorkItemStatus(
+ @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException {
+ Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems()
+ .reportStatus(options.getProject(), jobId, request);
+ return jobWorkItemsReportStatus.execute();
+ }
+}
[4/4] incubator-beam git commit: Closes #1434
Posted by bc...@apache.org.
Closes #1434
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2b570f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2b570f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2b570f2
Branch: refs/heads/master
Commit: b2b570f27808b1671bf6cd0fc60f874da671d4ca
Parents: afedd68 ce03f30
Author: bchambers <bc...@google.com>
Authored: Tue Dec 6 17:08:13 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:13 2016 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowClient.java | 140 +++++++++++++++++++
.../runners/dataflow/DataflowPipelineJob.java | 31 ++--
.../beam/runners/dataflow/DataflowRunner.java | 20 +--
.../dataflow/testing/TestDataflowRunner.java | 6 +-
.../dataflow/util/DataflowTemplateJob.java | 2 +-
.../runners/dataflow/util/MonitoringUtil.java | 22 +--
.../dataflow/DataflowPipelineJobTest.java | 49 ++++---
.../testing/TestDataflowRunnerTest.java | 36 ++---
.../transforms/DataflowGroupByKeyTest.java | 14 +-
.../dataflow/transforms/DataflowViewTest.java | 16 ++-
.../dataflow/util/MonitoringUtilTest.java | 21 +--
11 files changed, 235 insertions(+), 122 deletions(-)
----------------------------------------------------------------------