You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/26 02:03:50 UTC
[2/2] beam git commit: Add Proto Definitions for the Artifact API
Add Proto Definitions for the Artifact API
Update the Job API to permit a "prepare" phase of executing a pipeline,
where prerequisite work like staging artifacts can be performed before
the job is executed.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e21f453
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e21f453
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e21f453
Branch: refs/heads/master
Commit: 1e21f453721cb7aef0783cb73d72f6b928685515
Parents: 0f77af8
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 17 17:45:09 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 25 19:03:37 2017 -0700
----------------------------------------------------------------------
.../src/main/proto/beam_artifact_api.proto | 122 +++++++++++++++++++
.../src/main/proto/beam_job_api.proto | 46 +++++--
2 files changed, 157 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1e21f453/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
new file mode 100644
index 0000000..6e39d88
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Artifact API, for communicating with a runner
+ * for artifact staging and retrieval over GRPC.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "ArtifactApi";
+
+// A service to stage artifacts for use in a Job.
+//
+// RPCs made to an ArtifactStagingService endpoint should include some form of identification for
+// the job as a header.
+service ArtifactStagingService {
+ // Stage an artifact to be available during job execution. The first request must contain the
+ // name of the artifact. All future requests must contain sequential chunks of the content of
+ // the artifact.
+ rpc PutArtifact(stream PutArtifactRequest) returns (PutArtifactResponse);
+
+ // Commit the manifest for a Job. All artifacts must have been successfully uploaded
+ // before this call is made.
+ //
+ // Throws error INVALID_ARGUMENT if not all of the members of the manifest are present
+ rpc CommitManifest(CommitManifestRequest) returns (CommitManifestResponse);
+}
+
+// A service to retrieve artifacts for use in a Job.
+service ArtifactRetrievalService {
+ // Get the manifest for the job
+ rpc GetManifest(GetManifestRequest) returns (GetManifestResponse);
+
+ // Get an artifact staged for the job. The requested artifact must be within the manifest
+ rpc GetArtifact(GetArtifactRequest) returns (stream ArtifactChunk);
+}
+
+// An artifact identifier and associated metadata.
+message Artifact {
+ // (Required) The name of the artifact.
+ string name = 1;
+
+ // (Optional) The Unix-like permissions of the artifact
+ int32 permissions = 2;
+
+ // (Optional) The md5 checksum of the artifact.
+ string md5 = 3;
+}
+
+// A collection of artifacts.
+message Manifest {
+ repeated Artifact artifact = 1;
+}
+
+// A request to get the manifest of a Job.
+message GetManifestRequest {}
+
+// A response containing a job manifest.
+message GetManifestResponse {
+ Manifest manifest = 1;
+}
+
+// A request to get an artifact. The artifact must be present in the manifest for the job.
+message GetArtifactRequest {
+ // (Required) The name of the artifact to retrieve.
+ string name = 1;
+}
+
+// Part of an artifact.
+message ArtifactChunk {
+ bytes data = 1;
+}
+
+// A request to stage an artifact.
+message PutArtifactRequest {
+ // (Required)
+ oneof content {
+ // The name of the artifact. The first message in a PutArtifact call must contain the name
+ // of the artifact.
+ string name = 1;
+
+ // A chunk of the artifact. All messages after the first in a PutArtifact call must contain a
+ // chunk.
+ ArtifactChunk data = 2;
+ }
+}
+
+message PutArtifactResponse {
+}
+
+// A request to commit the manifest for a Job. All artifacts must have been successfully uploaded
+// before this call is made.
+message CommitManifestRequest {
+ // (Required) The manifest to commit.
+ Manifest manifest = 1;
+}
+
+// The result of committing a manifest.
+message CommitManifestResponse {
+ // (Required) An opaque token representing the entirety of the staged artifacts.
+ string staging_token = 1;
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/1e21f453/sdks/common/runner-api/src/main/proto/beam_job_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_job_api.proto b/sdks/common/runner-api/src/main/proto/beam_job_api.proto
index 7be14cc..8946d2a 100644
--- a/sdks/common/runner-api/src/main/proto/beam_job_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_job_api.proto
@@ -34,36 +34,60 @@ import "google/protobuf/struct.proto";
// Job Service for running RunnerAPI pipelines
service JobService {
+ // Prepare a job for execution. The job will not be executed until a call is made to run with the
+ // returned preparationId.
+ rpc prepare (PrepareJobRequest) returns (PrepareJobResponse);
+
// Submit the job for execution
- rpc run (SubmitJobRequest) returns (SubmitJobResponse) {}
+ rpc run (RunJobRequest) returns (RunJobResponse);
// Get the current state of the job
- rpc getState (GetJobStateRequest) returns (GetJobStateResponse) {}
+ rpc getState (GetJobStateRequest) returns (GetJobStateResponse);
// Cancel the job
- rpc cancel (CancelJobRequest) returns (CancelJobResponse) {}
+ rpc cancel (CancelJobRequest) returns (CancelJobResponse);
// Subscribe to a stream of state changes of the job, will immediately return the current state of the job as the first response.
- rpc getStateStream (GetJobStateRequest) returns (stream GetJobStateResponse) {}
+ rpc getStateStream (GetJobStateRequest) returns (stream GetJobStateResponse);
// Subscribe to a stream of state changes and messages from the job
- rpc getMessageStream (JobMessagesRequest) returns (stream JobMessagesResponse) {}
+ rpc getMessageStream (JobMessagesRequest) returns (stream JobMessagesResponse);
}
-// Submit is a synchronus request that returns a jobId back
+// Prepare is a synchronous request that returns a preparationId back
// Throws error GRPC_STATUS_UNAVAILABLE if server is down
-// Throws error ALREADY_EXISTS if the jobName is reused as runners are permitted to deduplicate based on the name of the job.
+// Throws error ALREADY_EXISTS if the jobName is reused. Runners are permitted to deduplicate based on the name of the job.
// Throws error UNKNOWN for all other issues
-message SubmitJobRequest {
+message PrepareJobRequest {
org.apache.beam.runner_api.v1.Pipeline pipeline = 1; // (required)
google.protobuf.Struct pipelineOptions = 2; // (required)
string jobName = 3; // (required)
}
-message SubmitJobResponse {
- // JobId is used as an identifier for the job in all future calls.
- string jobId = 1; // (required)
+message PrepareJobResponse {
+ // (required) The ID used to associate calls made while preparing the job. preparationId is used
+ // to run the job, as well as in other pre-execution APIs such as Artifact staging.
+ string preparationId = 1;
+}
+
+
+// Run is a synchronous request that returns a jobId back.
+// Throws error GRPC_STATUS_UNAVAILABLE if server is down
+// Throws error NOT_FOUND if the preparation ID does not exist
+// Throws error UNKNOWN for all other issues
+message RunJobRequest {
+ // (required) The ID provided by an earlier call to prepare. Runs the job. All prerequisite tasks
+ // must have been completed.
+ string preparationId = 1;
+ // (optional) If any artifacts have been staged for this job, contains the staging_token returned
+ // from the CommitManifestResponse.
+ string stagingToken = 2;
+}
+
+
+message RunJobResponse {
+ string jobId = 1; // (required) The ID for the executing job
}