You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/06 15:53:56 UTC

[1/3] beam git commit: [BEAM-2877][BEAM-2881] Add Java SDK harness container image and support

Repository: beam
Updated Branches:
  refs/heads/master ed00299cc -> 63b54a5b0


http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/model/org_apache_beam_fn_v1/beam_provision_api.pb.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/org_apache_beam_fn_v1/beam_provision_api.pb.go b/sdks/go/pkg/beam/model/org_apache_beam_fn_v1/beam_provision_api.pb.go
new file mode 100644
index 0000000..534f65c
--- /dev/null
+++ b/sdks/go/pkg/beam/model/org_apache_beam_fn_v1/beam_provision_api.pb.go
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: beam_provision_api.proto
+
+/*
+Package org_apache_beam_fn_v1 is a generated protocol buffer package.
+
+It is generated from these files:
+	beam_provision_api.proto
+
+It has these top-level messages:
+	GetProvisionInfoRequest
+	GetProvisionInfoResponse
+	ProvisionInfo
+*/
+package org_apache_beam_fn_v1
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import google_protobuf "github.com/golang/protobuf/ptypes/struct"
+
+import (
+	context "golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+// A request to get the provision info of a SDK harness worker instance.
+type GetProvisionInfoRequest struct {
+}
+
+func (m *GetProvisionInfoRequest) Reset()                    { *m = GetProvisionInfoRequest{} }
+func (m *GetProvisionInfoRequest) String() string            { return proto.CompactTextString(m) }
+func (*GetProvisionInfoRequest) ProtoMessage()               {}
+func (*GetProvisionInfoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+// A response containing the provision info of a SDK harness worker instance.
+type GetProvisionInfoResponse struct {
+	Info *ProvisionInfo `protobuf:"bytes,1,opt,name=info" json:"info,omitempty"`
+}
+
+func (m *GetProvisionInfoResponse) Reset()                    { *m = GetProvisionInfoResponse{} }
+func (m *GetProvisionInfoResponse) String() string            { return proto.CompactTextString(m) }
+func (*GetProvisionInfoResponse) ProtoMessage()               {}
+func (*GetProvisionInfoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *GetProvisionInfoResponse) GetInfo() *ProvisionInfo {
+	if m != nil {
+		return m.Info
+	}
+	return nil
+}
+
+// Runtime provisioning information for a SDK harness worker instance,
+// such as pipeline options, resource constraints and other job metadata
+type ProvisionInfo struct {
+	// (required) The job ID.
+	JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId" json:"job_id,omitempty"`
+	// (required) The job name.
+	JobName string `protobuf:"bytes,2,opt,name=job_name,json=jobName" json:"job_name,omitempty"`
+	// (required) Pipeline options. For non-template jobs, the options are
+	// identical to what is passed to job submission.
+	PipelineOptions *google_protobuf.Struct `protobuf:"bytes,3,opt,name=pipeline_options,json=pipelineOptions" json:"pipeline_options,omitempty"`
+}
+
+func (m *ProvisionInfo) Reset()                    { *m = ProvisionInfo{} }
+func (m *ProvisionInfo) String() string            { return proto.CompactTextString(m) }
+func (*ProvisionInfo) ProtoMessage()               {}
+func (*ProvisionInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *ProvisionInfo) GetJobId() string {
+	if m != nil {
+		return m.JobId
+	}
+	return ""
+}
+
+func (m *ProvisionInfo) GetJobName() string {
+	if m != nil {
+		return m.JobName
+	}
+	return ""
+}
+
+func (m *ProvisionInfo) GetPipelineOptions() *google_protobuf.Struct {
+	if m != nil {
+		return m.PipelineOptions
+	}
+	return nil
+}
+
+func init() {
+	proto.RegisterType((*GetProvisionInfoRequest)(nil), "org.apache.beam.fn.v1.GetProvisionInfoRequest")
+	proto.RegisterType((*GetProvisionInfoResponse)(nil), "org.apache.beam.fn.v1.GetProvisionInfoResponse")
+	proto.RegisterType((*ProvisionInfo)(nil), "org.apache.beam.fn.v1.ProvisionInfo")
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for ProvisionService service
+
+type ProvisionServiceClient interface {
+	// Get provision information for the SDK harness worker instance.
+	GetProvisionInfo(ctx context.Context, in *GetProvisionInfoRequest, opts ...grpc.CallOption) (*GetProvisionInfoResponse, error)
+}
+
+type provisionServiceClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewProvisionServiceClient(cc *grpc.ClientConn) ProvisionServiceClient {
+	return &provisionServiceClient{cc}
+}
+
+func (c *provisionServiceClient) GetProvisionInfo(ctx context.Context, in *GetProvisionInfoRequest, opts ...grpc.CallOption) (*GetProvisionInfoResponse, error) {
+	out := new(GetProvisionInfoResponse)
+	err := grpc.Invoke(ctx, "/org.apache.beam.fn.v1.ProvisionService/GetProvisionInfo", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// Server API for ProvisionService service
+
+type ProvisionServiceServer interface {
+	// Get provision information for the SDK harness worker instance.
+	GetProvisionInfo(context.Context, *GetProvisionInfoRequest) (*GetProvisionInfoResponse, error)
+}
+
+func RegisterProvisionServiceServer(s *grpc.Server, srv ProvisionServiceServer) {
+	s.RegisterService(&_ProvisionService_serviceDesc, srv)
+}
+
+func _ProvisionService_GetProvisionInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(GetProvisionInfoRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ProvisionServiceServer).GetProvisionInfo(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/org.apache.beam.fn.v1.ProvisionService/GetProvisionInfo",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ProvisionServiceServer).GetProvisionInfo(ctx, req.(*GetProvisionInfoRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _ProvisionService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "org.apache.beam.fn.v1.ProvisionService",
+	HandlerType: (*ProvisionServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "GetProvisionInfo",
+			Handler:    _ProvisionService_GetProvisionInfo_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "beam_provision_api.proto",
+}
+
+func init() { proto.RegisterFile("beam_provision_api.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+	// 284 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xcf, 0x4b, 0xc3, 0x30,
+	0x14, 0xc7, 0xa9, 0x3f, 0xa6, 0x46, 0xc5, 0x12, 0x18, 0xeb, 0x86, 0x07, 0x29, 0x0a, 0x9e, 0x52,
+	0x9c, 0x17, 0xaf, 0xf6, 0x22, 0xbb, 0xa8, 0x74, 0xde, 0x4b, 0xd2, 0xbd, 0xd6, 0x94, 0x35, 0x2f,
+	0x36, 0x69, 0xff, 0x03, 0xf1, 0xdf, 0x96, 0xa5, 0x74, 0x30, 0xdd, 0x60, 0xd7, 0xf7, 0x3e, 0xdf,
+	0x97, 0xf7, 0x79, 0x21, 0x81, 0x00, 0x5e, 0xa5, 0xba, 0xc6, 0x56, 0x1a, 0x89, 0x2a, 0xe5, 0x5a,
+	0x32, 0x5d, 0xa3, 0x45, 0x3a, 0xc4, 0xba, 0x60, 0x5c, 0xf3, 0xec, 0x13, 0xd8, 0x0a, 0x62, 0xb9,
+	0x62, 0xed, 0xc3, 0xe4, 0xba, 0x40, 0x2c, 0x96, 0x10, 0x39, 0x48, 0x34, 0x79, 0x64, 0x6c, 0xdd,
+	0x64, 0xb6, 0x0b, 0x85, 0x63, 0x32, 0x7a, 0x01, 0xfb, 0xde, 0x8f, 0x9b, 0xa9, 0x1c, 0x13, 0xf8,
+	0x6a, 0xc0, 0xd8, 0xf0, 0x83, 0x04, 0xff, 0x5b, 0x46, 0xa3, 0x32, 0x40, 0x9f, 0xc8, 0x91, 0x54,
+	0x39, 0x06, 0xde, 0x8d, 0x77, 0x7f, 0x3e, 0xbd, 0x65, 0x5b, 0x9f, 0x66, 0x9b, 0x59, 0x97, 0x08,
+	0xbf, 0x3d, 0x72, 0xb9, 0x51, 0xa7, 0x43, 0x32, 0x28, 0x51, 0xa4, 0x72, 0xe1, 0xa6, 0x9d, 0x25,
+	0xc7, 0x25, 0x8a, 0xd9, 0x82, 0x8e, 0xc9, 0xe9, 0xaa, 0xac, 0x78, 0x05, 0xc1, 0x81, 0x6b, 0x9c,
+	0x94, 0x28, 0x5e, 0x79, 0x05, 0x34, 0x26, 0xbe, 0x96, 0x1a, 0x96, 0x52, 0x41, 0x8a, 0xda, 0x4a,
+	0x54, 0x26, 0x38, 0x74, 0x9b, 0x8c, 0x58, 0x67, 0xcb, 0x7a, 0x5b, 0x36, 0x77, 0xb6, 0xc9, 0x55,
+	0x1f, 0x78, 0xeb, 0xf8, 0xe9, 0x8f, 0x47, 0xfc, 0xf5, 0x1e, 0x73, 0xa8, 0x5b, 0x99, 0x01, 0x35,
+	0xc4, 0xff, 0xab, 0x4c, 0xd9, 0x0e, 0xb9, 0x1d, 0x67, 0x9b, 0x44, 0x7b, 0xf3, 0xdd, 0x2d, 0xe3,
+	0x3b, 0xb2, 0xfd, 0xe7, 0xe2, 0x8b, 0x35, 0xff, 0xac, 0xa5, 0x18, 0x38, 0xa5, 0xc7, 0xdf, 0x00,
+	0x00, 0x00, 0xff, 0xff, 0x97, 0x2b, 0xbd, 0x57, 0x01, 0x02, 0x00, 0x00,
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go b/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
new file mode 100644
index 0000000..e8cc800
--- /dev/null
+++ b/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: beam_artifact_api.proto
+
+/*
+Package org_apache_beam_runner_api_v1 is a generated protocol buffer package.
+
+It is generated from these files:
+	beam_artifact_api.proto
+
+It has these top-level messages:
+	ArtifactMetadata
+	Manifest
+	ProxyManifest
+	GetManifestRequest
+	GetManifestResponse
+	GetArtifactRequest
+	ArtifactChunk
+	PutArtifactRequest
+	PutArtifactResponse
+	CommitManifestRequest
+	CommitManifestResponse
+*/
+package org_apache_beam_runner_api_v1
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+import (
+	context "golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+// An artifact identifier and associated metadata.
+type ArtifactMetadata struct {
+	// (Required) The name of the artifact.
+	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+	// (Optional) The Unix-like permissions of the artifact
+	Permissions uint32 `protobuf:"varint,2,opt,name=permissions" json:"permissions,omitempty"`
+	// (Optional) The md5 checksum of the artifact. Used, among other things, by harness boot code to
+	// validate the integrity of the artifact.
+	Md5 string `protobuf:"bytes,3,opt,name=md5" json:"md5,omitempty"`
+}
+
+func (m *ArtifactMetadata) Reset()                    { *m = ArtifactMetadata{} }
+func (m *ArtifactMetadata) String() string            { return proto.CompactTextString(m) }
+func (*ArtifactMetadata) ProtoMessage()               {}
+func (*ArtifactMetadata) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *ArtifactMetadata) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *ArtifactMetadata) GetPermissions() uint32 {
+	if m != nil {
+		return m.Permissions
+	}
+	return 0
+}
+
+func (m *ArtifactMetadata) GetMd5() string {
+	if m != nil {
+		return m.Md5
+	}
+	return ""
+}
+
+// A collection of artifacts.
+type Manifest struct {
+	Artifact []*ArtifactMetadata `protobuf:"bytes,1,rep,name=artifact" json:"artifact,omitempty"`
+}
+
+func (m *Manifest) Reset()                    { *m = Manifest{} }
+func (m *Manifest) String() string            { return proto.CompactTextString(m) }
+func (*Manifest) ProtoMessage()               {}
+func (*Manifest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *Manifest) GetArtifact() []*ArtifactMetadata {
+	if m != nil {
+		return m.Artifact
+	}
+	return nil
+}
+
+// A manifest with location information.
+type ProxyManifest struct {
+	Manifest *Manifest                 `protobuf:"bytes,1,opt,name=manifest" json:"manifest,omitempty"`
+	Location []*ProxyManifest_Location `protobuf:"bytes,2,rep,name=location" json:"location,omitempty"`
+}
+
+func (m *ProxyManifest) Reset()                    { *m = ProxyManifest{} }
+func (m *ProxyManifest) String() string            { return proto.CompactTextString(m) }
+func (*ProxyManifest) ProtoMessage()               {}
+func (*ProxyManifest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *ProxyManifest) GetManifest() *Manifest {
+	if m != nil {
+		return m.Manifest
+	}
+	return nil
+}
+
+func (m *ProxyManifest) GetLocation() []*ProxyManifest_Location {
+	if m != nil {
+		return m.Location
+	}
+	return nil
+}
+
+type ProxyManifest_Location struct {
+	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+	Uri  string `protobuf:"bytes,2,opt,name=uri" json:"uri,omitempty"`
+}
+
+func (m *ProxyManifest_Location) Reset()                    { *m = ProxyManifest_Location{} }
+func (m *ProxyManifest_Location) String() string            { return proto.CompactTextString(m) }
+func (*ProxyManifest_Location) ProtoMessage()               {}
+func (*ProxyManifest_Location) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2, 0} }
+
+func (m *ProxyManifest_Location) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+func (m *ProxyManifest_Location) GetUri() string {
+	if m != nil {
+		return m.Uri
+	}
+	return ""
+}
+
+// A request to get the manifest of a Job.
+type GetManifestRequest struct {
+}
+
+func (m *GetManifestRequest) Reset()                    { *m = GetManifestRequest{} }
+func (m *GetManifestRequest) String() string            { return proto.CompactTextString(m) }
+func (*GetManifestRequest) ProtoMessage()               {}
+func (*GetManifestRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
+
+// A response containing a job manifest.
+type GetManifestResponse struct {
+	Manifest *Manifest `protobuf:"bytes,1,opt,name=manifest" json:"manifest,omitempty"`
+}
+
+func (m *GetManifestResponse) Reset()                    { *m = GetManifestResponse{} }
+func (m *GetManifestResponse) String() string            { return proto.CompactTextString(m) }
+func (*GetManifestResponse) ProtoMessage()               {}
+func (*GetManifestResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
+
+func (m *GetManifestResponse) GetManifest() *Manifest {
+	if m != nil {
+		return m.Manifest
+	}
+	return nil
+}
+
+// A request to get an artifact. The artifact must be present in the manifest for the job.
+type GetArtifactRequest struct {
+	// (Required) The name of the artifact to retrieve.
+	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+}
+
+func (m *GetArtifactRequest) Reset()                    { *m = GetArtifactRequest{} }
+func (m *GetArtifactRequest) String() string            { return proto.CompactTextString(m) }
+func (*GetArtifactRequest) ProtoMessage()               {}
+func (*GetArtifactRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
+
+func (m *GetArtifactRequest) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+// Part of an artifact.
+type ArtifactChunk struct {
+	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (m *ArtifactChunk) Reset()                    { *m = ArtifactChunk{} }
+func (m *ArtifactChunk) String() string            { return proto.CompactTextString(m) }
+func (*ArtifactChunk) ProtoMessage()               {}
+func (*ArtifactChunk) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
+
+func (m *ArtifactChunk) GetData() []byte {
+	if m != nil {
+		return m.Data
+	}
+	return nil
+}
+
+// A request to stage an artifact.
+type PutArtifactRequest struct {
+	// (Required)
+	//
+	// Types that are valid to be assigned to Content:
+	//	*PutArtifactRequest_Metadata
+	//	*PutArtifactRequest_Data
+	Content isPutArtifactRequest_Content `protobuf_oneof:"content"`
+}
+
+func (m *PutArtifactRequest) Reset()                    { *m = PutArtifactRequest{} }
+func (m *PutArtifactRequest) String() string            { return proto.CompactTextString(m) }
+func (*PutArtifactRequest) ProtoMessage()               {}
+func (*PutArtifactRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
+
+type isPutArtifactRequest_Content interface {
+	isPutArtifactRequest_Content()
+}
+
+type PutArtifactRequest_Metadata struct {
+	Metadata *ArtifactMetadata `protobuf:"bytes,1,opt,name=metadata,oneof"`
+}
+type PutArtifactRequest_Data struct {
+	Data *ArtifactChunk `protobuf:"bytes,2,opt,name=data,oneof"`
+}
+
+func (*PutArtifactRequest_Metadata) isPutArtifactRequest_Content() {}
+func (*PutArtifactRequest_Data) isPutArtifactRequest_Content()     {}
+
+func (m *PutArtifactRequest) GetContent() isPutArtifactRequest_Content {
+	if m != nil {
+		return m.Content
+	}
+	return nil
+}
+
+func (m *PutArtifactRequest) GetMetadata() *ArtifactMetadata {
+	if x, ok := m.GetContent().(*PutArtifactRequest_Metadata); ok {
+		return x.Metadata
+	}
+	return nil
+}
+
+func (m *PutArtifactRequest) GetData() *ArtifactChunk {
+	if x, ok := m.GetContent().(*PutArtifactRequest_Data); ok {
+		return x.Data
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*PutArtifactRequest) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _PutArtifactRequest_OneofMarshaler, _PutArtifactRequest_OneofUnmarshaler, _PutArtifactRequest_OneofSizer, []interface{}{
+		(*PutArtifactRequest_Metadata)(nil),
+		(*PutArtifactRequest_Data)(nil),
+	}
+}
+
+func _PutArtifactRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*PutArtifactRequest)
+	// content
+	switch x := m.Content.(type) {
+	case *PutArtifactRequest_Metadata:
+		b.EncodeVarint(1<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Metadata); err != nil {
+			return err
+		}
+	case *PutArtifactRequest_Data:
+		b.EncodeVarint(2<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Data); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("PutArtifactRequest.Content has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _PutArtifactRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*PutArtifactRequest)
+	switch tag {
+	case 1: // content.metadata
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ArtifactMetadata)
+		err := b.DecodeMessage(msg)
+		m.Content = &PutArtifactRequest_Metadata{msg}
+		return true, err
+	case 2: // content.data
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ArtifactChunk)
+		err := b.DecodeMessage(msg)
+		m.Content = &PutArtifactRequest_Data{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _PutArtifactRequest_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*PutArtifactRequest)
+	// content
+	switch x := m.Content.(type) {
+	case *PutArtifactRequest_Metadata:
+		s := proto.Size(x.Metadata)
+		n += proto.SizeVarint(1<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *PutArtifactRequest_Data:
+		s := proto.Size(x.Data)
+		n += proto.SizeVarint(2<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+type PutArtifactResponse struct {
+}
+
+func (m *PutArtifactResponse) Reset()                    { *m = PutArtifactResponse{} }
+func (m *PutArtifactResponse) String() string            { return proto.CompactTextString(m) }
+func (*PutArtifactResponse) ProtoMessage()               {}
+func (*PutArtifactResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
+
+// A request to commit the manifest for a Job. All artifacts must have been successfully uploaded
+// before this call is made.
+type CommitManifestRequest struct {
+	// (Required) The manifest to commit.
+	Manifest *Manifest `protobuf:"bytes,1,opt,name=manifest" json:"manifest,omitempty"`
+}
+
+func (m *CommitManifestRequest) Reset()                    { *m = CommitManifestRequest{} }
+func (m *CommitManifestRequest) String() string            { return proto.CompactTextString(m) }
+func (*CommitManifestRequest) ProtoMessage()               {}
+func (*CommitManifestRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
+
+func (m *CommitManifestRequest) GetManifest() *Manifest {
+	if m != nil {
+		return m.Manifest
+	}
+	return nil
+}
+
+// The result of committing a manifest.
+type CommitManifestResponse struct {
+	// (Required) An opaque token representing the entirety of the staged artifacts.
+	StagingToken string `protobuf:"bytes,1,opt,name=staging_token,json=stagingToken" json:"staging_token,omitempty"`
+}
+
+func (m *CommitManifestResponse) Reset()                    { *m = CommitManifestResponse{} }
+func (m *CommitManifestResponse) String() string            { return proto.CompactTextString(m) }
+func (*CommitManifestResponse) ProtoMessage()               {}
+func (*CommitManifestResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
+
+func (m *CommitManifestResponse) GetStagingToken() string {
+	if m != nil {
+		return m.StagingToken
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*ArtifactMetadata)(nil), "org.apache.beam.runner_api.v1.ArtifactMetadata")
+	proto.RegisterType((*Manifest)(nil), "org.apache.beam.runner_api.v1.Manifest")
+	proto.RegisterType((*ProxyManifest)(nil), "org.apache.beam.runner_api.v1.ProxyManifest")
+	proto.RegisterType((*ProxyManifest_Location)(nil), "org.apache.beam.runner_api.v1.ProxyManifest.Location")
+	proto.RegisterType((*GetManifestRequest)(nil), "org.apache.beam.runner_api.v1.GetManifestRequest")
+	proto.RegisterType((*GetManifestResponse)(nil), "org.apache.beam.runner_api.v1.GetManifestResponse")
+	proto.RegisterType((*GetArtifactRequest)(nil), "org.apache.beam.runner_api.v1.GetArtifactRequest")
+	proto.RegisterType((*ArtifactChunk)(nil), "org.apache.beam.runner_api.v1.ArtifactChunk")
+	proto.RegisterType((*PutArtifactRequest)(nil), "org.apache.beam.runner_api.v1.PutArtifactRequest")
+	proto.RegisterType((*PutArtifactResponse)(nil), "org.apache.beam.runner_api.v1.PutArtifactResponse")
+	proto.RegisterType((*CommitManifestRequest)(nil), "org.apache.beam.runner_api.v1.CommitManifestRequest")
+	proto.RegisterType((*CommitManifestResponse)(nil), "org.apache.beam.runner_api.v1.CommitManifestResponse")
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for ArtifactStagingService service
+
+type ArtifactStagingServiceClient interface {
+	// Stage an artifact to be available during job execution. The first request must contain the
+	// name of the artifact. All future requests must contain sequential chunks of the content of
+	// the artifact.
+	PutArtifact(ctx context.Context, opts ...grpc.CallOption) (ArtifactStagingService_PutArtifactClient, error)
+	// Commit the manifest for a Job. All artifacts must have been successfully uploaded
+	// before this call is made.
+	//
+	// Throws error INVALID_ARGUMENT if not all of the members of the manifest are present
+	CommitManifest(ctx context.Context, in *CommitManifestRequest, opts ...grpc.CallOption) (*CommitManifestResponse, error)
+}
+
+type artifactStagingServiceClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewArtifactStagingServiceClient(cc *grpc.ClientConn) ArtifactStagingServiceClient {
+	return &artifactStagingServiceClient{cc}
+}
+
+func (c *artifactStagingServiceClient) PutArtifact(ctx context.Context, opts ...grpc.CallOption) (ArtifactStagingService_PutArtifactClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_ArtifactStagingService_serviceDesc.Streams[0], c.cc, "/org.apache.beam.runner_api.v1.ArtifactStagingService/PutArtifact", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &artifactStagingServicePutArtifactClient{stream}
+	return x, nil
+}
+
+type ArtifactStagingService_PutArtifactClient interface {
+	Send(*PutArtifactRequest) error
+	CloseAndRecv() (*PutArtifactResponse, error)
+	grpc.ClientStream
+}
+
+type artifactStagingServicePutArtifactClient struct {
+	grpc.ClientStream
+}
+
+func (x *artifactStagingServicePutArtifactClient) Send(m *PutArtifactRequest) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *artifactStagingServicePutArtifactClient) CloseAndRecv() (*PutArtifactResponse, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(PutArtifactResponse)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func (c *artifactStagingServiceClient) CommitManifest(ctx context.Context, in *CommitManifestRequest, opts ...grpc.CallOption) (*CommitManifestResponse, error) {
+	out := new(CommitManifestResponse)
+	err := grpc.Invoke(ctx, "/org.apache.beam.runner_api.v1.ArtifactStagingService/CommitManifest", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// Server API for ArtifactStagingService service
+
+type ArtifactStagingServiceServer interface {
+	// Stage an artifact to be available during job execution. The first request must contain the
+	// name of the artifact. All future requests must contain sequential chunks of the content of
+	// the artifact.
+	PutArtifact(ArtifactStagingService_PutArtifactServer) error
+	// Commit the manifest for a Job. All artifacts must have been successfully uploaded
+	// before this call is made.
+	//
+	// Throws error INVALID_ARGUMENT if not all of the members of the manifest are present
+	CommitManifest(context.Context, *CommitManifestRequest) (*CommitManifestResponse, error)
+}
+
+func RegisterArtifactStagingServiceServer(s *grpc.Server, srv ArtifactStagingServiceServer) {
+	s.RegisterService(&_ArtifactStagingService_serviceDesc, srv)
+}
+
+func _ArtifactStagingService_PutArtifact_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(ArtifactStagingServiceServer).PutArtifact(&artifactStagingServicePutArtifactServer{stream})
+}
+
+type ArtifactStagingService_PutArtifactServer interface {
+	SendAndClose(*PutArtifactResponse) error
+	Recv() (*PutArtifactRequest, error)
+	grpc.ServerStream
+}
+
+type artifactStagingServicePutArtifactServer struct {
+	grpc.ServerStream
+}
+
+func (x *artifactStagingServicePutArtifactServer) SendAndClose(m *PutArtifactResponse) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *artifactStagingServicePutArtifactServer) Recv() (*PutArtifactRequest, error) {
+	m := new(PutArtifactRequest)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func _ArtifactStagingService_CommitManifest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(CommitManifestRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ArtifactStagingServiceServer).CommitManifest(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/org.apache.beam.runner_api.v1.ArtifactStagingService/CommitManifest",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ArtifactStagingServiceServer).CommitManifest(ctx, req.(*CommitManifestRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _ArtifactStagingService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "org.apache.beam.runner_api.v1.ArtifactStagingService",
+	HandlerType: (*ArtifactStagingServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "CommitManifest",
+			Handler:    _ArtifactStagingService_CommitManifest_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "PutArtifact",
+			Handler:       _ArtifactStagingService_PutArtifact_Handler,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "beam_artifact_api.proto",
+}
+
+// Client API for ArtifactRetrievalService service
+
+type ArtifactRetrievalServiceClient interface {
+	// Get the manifest for the job
+	GetManifest(ctx context.Context, in *GetManifestRequest, opts ...grpc.CallOption) (*GetManifestResponse, error)
+	// Get an artifact staged for the job. The requested artifact must be within the manifest
+	GetArtifact(ctx context.Context, in *GetArtifactRequest, opts ...grpc.CallOption) (ArtifactRetrievalService_GetArtifactClient, error)
+}
+
+type artifactRetrievalServiceClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewArtifactRetrievalServiceClient(cc *grpc.ClientConn) ArtifactRetrievalServiceClient {
+	return &artifactRetrievalServiceClient{cc}
+}
+
+func (c *artifactRetrievalServiceClient) GetManifest(ctx context.Context, in *GetManifestRequest, opts ...grpc.CallOption) (*GetManifestResponse, error) {
+	out := new(GetManifestResponse)
+	err := grpc.Invoke(ctx, "/org.apache.beam.runner_api.v1.ArtifactRetrievalService/GetManifest", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *artifactRetrievalServiceClient) GetArtifact(ctx context.Context, in *GetArtifactRequest, opts ...grpc.CallOption) (ArtifactRetrievalService_GetArtifactClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_ArtifactRetrievalService_serviceDesc.Streams[0], c.cc, "/org.apache.beam.runner_api.v1.ArtifactRetrievalService/GetArtifact", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &artifactRetrievalServiceGetArtifactClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type ArtifactRetrievalService_GetArtifactClient interface {
+	Recv() (*ArtifactChunk, error)
+	grpc.ClientStream
+}
+
+type artifactRetrievalServiceGetArtifactClient struct {
+	grpc.ClientStream
+}
+
+func (x *artifactRetrievalServiceGetArtifactClient) Recv() (*ArtifactChunk, error) {
+	m := new(ArtifactChunk)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// Server API for ArtifactRetrievalService service
+
+type ArtifactRetrievalServiceServer interface {
+	// Get the manifest for the job
+	GetManifest(context.Context, *GetManifestRequest) (*GetManifestResponse, error)
+	// Get an artifact staged for the job. The requested artifact must be within the manifest
+	GetArtifact(*GetArtifactRequest, ArtifactRetrievalService_GetArtifactServer) error
+}
+
+func RegisterArtifactRetrievalServiceServer(s *grpc.Server, srv ArtifactRetrievalServiceServer) {
+	s.RegisterService(&_ArtifactRetrievalService_serviceDesc, srv)
+}
+
+func _ArtifactRetrievalService_GetManifest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(GetManifestRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(ArtifactRetrievalServiceServer).GetManifest(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/org.apache.beam.runner_api.v1.ArtifactRetrievalService/GetManifest",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(ArtifactRetrievalServiceServer).GetManifest(ctx, req.(*GetManifestRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _ArtifactRetrievalService_GetArtifact_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(GetArtifactRequest)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(ArtifactRetrievalServiceServer).GetArtifact(m, &artifactRetrievalServiceGetArtifactServer{stream})
+}
+
+type ArtifactRetrievalService_GetArtifactServer interface {
+	Send(*ArtifactChunk) error
+	grpc.ServerStream
+}
+
+type artifactRetrievalServiceGetArtifactServer struct {
+	grpc.ServerStream
+}
+
+func (x *artifactRetrievalServiceGetArtifactServer) Send(m *ArtifactChunk) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+var _ArtifactRetrievalService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "org.apache.beam.runner_api.v1.ArtifactRetrievalService",
+	HandlerType: (*ArtifactRetrievalServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "GetManifest",
+			Handler:    _ArtifactRetrievalService_GetManifest_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "GetArtifact",
+			Handler:       _ArtifactRetrievalService_GetArtifact_Handler,
+			ServerStreams: true,
+		},
+	},
+	Metadata: "beam_artifact_api.proto",
+}
+
+func init() { proto.RegisterFile("beam_artifact_api.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+	// 540 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x95, 0x5f, 0x6b, 0xdb, 0x3c,
+	0x14, 0xc6, 0xeb, 0xe4, 0xe5, 0x9d, 0x73, 0xdc, 0x8c, 0xa0, 0xae, 0x9d, 0x09, 0x0c, 0x82, 0x3a,
+	0x58, 0x2e, 0x86, 0xd7, 0xa4, 0xcb, 0xe5, 0x2e, 0x9a, 0x5c, 0xac, 0xb0, 0x05, 0x3a, 0x77, 0x30,
+	0x28, 0x83, 0xa0, 0x3a, 0x6a, 0x2a, 0x52, 0x49, 0x9e, 0xac, 0x98, 0xed, 0x62, 0x37, 0xfb, 0x02,
+	0xfb, 0x2e, 0xfb, 0x36, 0xfb, 0x36, 0xc3, 0xb2, 0xec, 0xe5, 0x1f, 0x73, 0x03, 0xbd, 0x3b, 0x9c,
+	0x9c, 0xe7, 0xd1, 0xcf, 0xcf, 0x91, 0x08, 0x3c, 0xbd, 0xa6, 0x84, 0x4f, 0x88, 0xd2, 0xec, 0x86,
+	0x44, 0x7a, 0x42, 0x62, 0x16, 0xc4, 0x4a, 0x6a, 0x89, 0x9e, 0x49, 0x35, 0x0b, 0x48, 0x4c, 0xa2,
+	0x5b, 0x1a, 0x64, 0x33, 0x81, 0x5a, 0x08, 0x41, 0x95, 0x99, 0x48, 0x7b, 0xf8, 0x0a, 0x5a, 0x67,
+	0x56, 0x34, 0xa6, 0x9a, 0x4c, 0x89, 0x26, 0x08, 0xc1, 0x7f, 0x82, 0x70, 0xea, 0x3b, 0x1d, 0xa7,
+	0xdb, 0x08, 0x4d, 0x8d, 0x3a, 0xe0, 0xc5, 0x54, 0x71, 0x96, 0x24, 0x4c, 0x8a, 0xc4, 0xaf, 0x75,
+	0x9c, 0x6e, 0x33, 0x5c, 0x6e, 0xa1, 0x16, 0xd4, 0xf9, 0x74, 0xe0, 0xd7, 0x8d, 0x28, 0x2b, 0xf1,
+	0x27, 0x70, 0xc7, 0x44, 0xb0, 0x1b, 0x9a, 0x68, 0xf4, 0x0e, 0xdc, 0x02, 0xce, 0x77, 0x3a, 0xf5,
+	0xae, 0xd7, 0x7f, 0x15, 0xfc, 0x93, 0x2c, 0x58, 0xc7, 0x0a, 0x4b, 0x03, 0xfc, 0xdb, 0x81, 0xe6,
+	0x85, 0x92, 0x5f, 0xbf, 0x95, 0xf6, 0x23, 0x70, 0xb9, 0xad, 0x0d, 0xb6, 0xd7, 0x7f, 0x51, 0x61,
+	0x5f, 0x48, 0xc3, 0x52, 0x88, 0x3e, 0x80, 0x7b, 0x27, 0x23, 0xa2, 0x99, 0x14, 0x7e, 0xcd, 0x30,
+	0x0e, 0x2a, 0x4c, 0x56, 0x20, 0x82, 0xf7, 0x56, 0x1c, 0x96, 0x36, 0xed, 0x13, 0x70, 0x8b, 0xee,
+	0xd6, 0x58, 0x5b, 0x50, 0x5f, 0x28, 0x66, 0xe2, 0x6c, 0x84, 0x59, 0x89, 0x9f, 0x00, 0x7a, 0x4b,
+	0x75, 0x49, 0x47, 0xbf, 0x2c, 0x68, 0xa2, 0xf1, 0x15, 0x1c, 0xac, 0x74, 0x93, 0x58, 0x8a, 0x84,
+	0x3e, 0xc8, 0x67, 0xe3, 0xae, 0x39, 0xb1, 0x88, 0xdb, 0x9e, 0xb8, 0x8d, 0x16, 0x1f, 0x43, 0xb3,
+	0x18, 0x1b, 0xdd, 0x2e, 0xc4, 0x3c, 0x1b, 0xca, 0x56, 0x63, 0x86, 0xf6, 0x43, 0x53, 0xe3, 0x5f,
+	0x0e, 0xa0, 0x8b, 0xc5, 0x86, 0xdf, 0x18, 0x5c, 0x6e, 0x37, 0x69, 0x51, 0x77, 0xbd, 0x00, 0xe7,
+	0x7b, 0x61, 0x69, 0x81, 0x86, 0xf6, 0xe4, 0x9a, 0xb1, 0x7a, 0x79, 0x4f, 0x2b, 0x43, 0x7d, 0xbe,
+	0x97, 0x93, 0x0e, 0x1b, 0xf0, 0x28, 0x92, 0x42, 0x53, 0xa1, 0xf1, 0x21, 0x1c, 0xac, 0x30, 0xe7,
+	0xf9, 0xe2, 0xcf, 0x70, 0x38, 0x92, 0x9c, 0xb3, 0xf5, 0x7d, 0x3c, 0x4c, 0xf0, 0x6f, 0xe0, 0x68,
+	0xdd, 0xdd, 0xee, 0xf5, 0x18, 0x9a, 0x89, 0x26, 0x33, 0x26, 0x66, 0x13, 0x2d, 0xe7, 0x54, 0xd8,
+	0x2d, 0xec, 0xdb, 0xe6, 0xc7, 0xac, 0xd7, 0xff, 0x59, 0x83, 0xa3, 0x82, 0xf8, 0x32, 0xff, 0xe1,
+	0x92, 0xaa, 0x94, 0x45, 0x14, 0xa5, 0xe0, 0x2d, 0x7d, 0x0e, 0xea, 0x55, 0x5d, 0xe3, 0x8d, 0x75,
+	0xb5, 0xfb, 0xbb, 0x48, 0x72, 0xea, 0xae, 0x83, 0xbe, 0xc3, 0xe3, 0xd5, 0x2f, 0x42, 0xaf, 0x2b,
+	0x7c, 0xb6, 0xc6, 0xdb, 0x1e, 0xec, 0xa8, 0xca, 0x01, 0xfa, 0x3f, 0x6a, 0xe0, 0xff, 0xa5, 0xd2,
+	0x8a, 0xd1, 0x94, 0xdc, 0x15, 0x99, 0x68, 0xf0, 0x96, 0x9e, 0x50, 0x65, 0x26, 0x9b, 0x8f, 0xb0,
+	0x32, 0x93, 0x6d, 0x2f, 0x34, 0x36, 0xa7, 0xde, 0x7b, 0x13, 0x9b, 0x0f, 0xb1, 0xbd, 0xd3, 0xdd,
+	0x3e, 0x71, 0x86, 0xa7, 0xf0, 0x7c, 0x5d, 0x90, 0x4c, 0xe7, 0x41, 0x24, 0x39, 0x97, 0xc2, 0x6a,
+	0x83, 0xb4, 0x37, 0xf4, 0x0a, 0xe1, 0x59, 0xcc, 0xae, 0xff, 0x37, 0x7f, 0x16, 0xa7, 0x7f, 0x02,
+	0x00, 0x00, 0xff, 0xff, 0x72, 0x5b, 0xc8, 0xd5, 0x47, 0x06, 0x00, 0x00,
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/provision/provision_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/provision/provision_test.go b/sdks/go/pkg/beam/provision/provision_test.go
new file mode 100644
index 0000000..f29bc9b
--- /dev/null
+++ b/sdks/go/pkg/beam/provision/provision_test.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package provision
+
+import (
+	"reflect"
+	"testing"
+)
+
+type s struct {
+	A int    `json:"a,omitempty"`
+	B string `json:"b,omitempty"`
+	C bool   `json:"c,omitempty"`
+	D *s     `json:"d,omitempty"`
+}
+
+// TestConversions verifies that we can process proto structs via JSON.
+func TestConversions(t *testing.T) {
+	tests := []s{
+		s{},
+		s{A: 2},
+		s{B: "foo"},
+		s{C: true},
+		s{D: &s{A: 3}},
+		s{A: 1, B: "bar", C: true, D: &s{A: 3, B: "baz"}},
+	}
+
+	for _, test := range tests {
+		enc, err := OptionsToProto(test)
+		if err != nil {
+			t.Errorf("Failed to marshal %v: %v", test, err)
+		}
+		var ret s
+		if err := ProtoToOptions(enc, &ret); err != nil {
+			t.Errorf("Failed to unmarshal %v from %v: %v", test, enc, err)
+		}
+		if !reflect.DeepEqual(test, ret) {
+			t.Errorf("Unmarshal(Marshal(%v)) = %v, want %v", test, ret, test)
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/provision/provison.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/provision/provison.go b/sdks/go/pkg/beam/provision/provison.go
new file mode 100644
index 0000000..efc418f
--- /dev/null
+++ b/sdks/go/pkg/beam/provision/provison.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package provision contains utilities for obtaining runtime provision,
+// information -- such as pipeline options.
+package provision
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+
+	"time"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_fn_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/golang/protobuf/jsonpb"
+	google_protobuf "github.com/golang/protobuf/ptypes/struct"
+)
+
+// Info returns the runtime provisioning info for the worker.
+func Info(ctx context.Context, endpoint string) (*pb.ProvisionInfo, error) {
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return nil, err
+	}
+	defer cc.Close()
+
+	client := pb.NewProvisionServiceClient(cc)
+
+	resp, err := client.GetProvisionInfo(ctx, &pb.GetProvisionInfoRequest{})
+	if err != nil {
+		return nil, fmt.Errorf("failed to get manifest: %v", err)
+	}
+	return resp.GetInfo(), nil
+}
+
+// OptionsToProto converts pipeline options to a proto struct via JSON.
+func OptionsToProto(v interface{}) (*google_protobuf.Struct, error) {
+	data, err := json.Marshal(v)
+	if err != nil {
+		return nil, err
+	}
+	return JSONToProto(string(data))
+}
+
+// JSONToProto converts JSON-encoded pipeline options to a proto struct.
+func JSONToProto(data string) (*google_protobuf.Struct, error) {
+	var out google_protobuf.Struct
+	if err := jsonpb.UnmarshalString(string(data), &out); err != nil {
+		return nil, err
+	}
+	return &out, nil
+}
+
+// ProtoToOptions converts pipeline options from a proto struct via JSON.
+func ProtoToOptions(opt *google_protobuf.Struct, v interface{}) error {
+	data, err := ProtoToJSON(opt)
+	if err != nil {
+		return err
+	}
+	return json.Unmarshal([]byte(data), v)
+}
+
+// ProtoToJSON converts pipeline options from a proto struct to JSON.
+func ProtoToJSON(opt *google_protobuf.Struct) (string, error) {
+	return (&jsonpb.Marshaler{}).MarshalToString(opt)
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/util/errorx/guarded.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/errorx/guarded.go b/sdks/go/pkg/beam/util/errorx/guarded.go
new file mode 100644
index 0000000..cc0b07b
--- /dev/null
+++ b/sdks/go/pkg/beam/util/errorx/guarded.go
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package errorx contains utilities for handling errors.
+package errorx
+
+import "sync"
+
+// GuardedError is a concurrency-safe error wrapper. It is sticky
+// in that the first error won't be overwritten.
+type GuardedError struct {
+	err error
+	mu  sync.Mutex
+}
+
+// Error returns the guarded error.
+func (g *GuardedError) Error() error {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+
+	return g.err
+}
+
+// TrySetError sets the error, if not already set. Returns true iff the
+// error was set.
+func (g *GuardedError) TrySetError(err error) bool {
+	g.mu.Lock()
+	defer g.mu.Unlock()
+
+	upd := g.err == nil
+	if upd {
+		g.err = err
+	}
+	return upd
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/util/execx/exec.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/execx/exec.go b/sdks/go/pkg/beam/util/execx/exec.go
new file mode 100644
index 0000000..b4978ef
--- /dev/null
+++ b/sdks/go/pkg/beam/util/execx/exec.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package execx contains wrappers and utilities for the exec package.
+package execx
+
+import (
+	"os"
+	"os/exec"
+)
+
+// Execute runs the program with the given arguments. It attaches stdio to the
+// child process.
+func Execute(prog string, args ...string) error {
+	cmd := exec.Command(prog, args...)
+	cmd.Stdin = os.Stdin
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+
+	return cmd.Run()
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/util/gcsx/gcs.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go b/sdks/go/pkg/beam/util/gcsx/gcs.go
new file mode 100644
index 0000000..2e04be0
--- /dev/null
+++ b/sdks/go/pkg/beam/util/gcsx/gcs.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package gcsx contains utilities for working with Google Cloud Storage (GCS).
+package gcsx
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/url"
+
+	"golang.org/x/oauth2/google"
+	"google.golang.org/api/storage/v1"
+)
+
+// NewClient creates a new GCS client with default application credentials.
+func NewClient(ctx context.Context, scope string) (*storage.Service, error) {
+	cl, err := google.DefaultClient(ctx, scope)
+	if err != nil {
+		return nil, err
+	}
+	return storage.New(cl)
+}
+
+// WriteObject writes the given content to the specified object. If the object
+// already exist, it is overwritten.
+func WriteObject(client *storage.Service, bucket, object string, r io.Reader) error {
+	obj := &storage.Object{
+		Name:   object,
+		Bucket: bucket,
+	}
+	_, err := client.Objects.Insert(bucket, obj).Media(r).Do()
+	return err
+}
+
+// ReadObject reads the content of the given object in full.
+func ReadObject(client *storage.Service, bucket, object string) ([]byte, error) {
+	resp, err := client.Objects.Get(bucket, object).Download()
+	if err != nil {
+		return nil, err
+	}
+	return ioutil.ReadAll(resp.Body)
+}
+
+// MakeObject creates a object location from bucket and path. For example,
+// MakeObject("foo", "bar/baz") returns "gs://foo/bar/baz". The bucket
+// must be non-empty.
+func MakeObject(bucket, path string) string {
+	if bucket == "" {
+		panic("bucket must be non-empty")
+	}
+	return fmt.Sprintf("gs://%v/%v", bucket, path)
+}
+
+// ParseObject deconstructs a GCS object name into (bucket, name).
+func ParseObject(object string) (bucket, path string, err error) {
+	parsed, err := url.Parse(object)
+	if err != nil {
+		return "", "", err
+	}
+
+	if parsed.Scheme != "gs" {
+		return "", "", fmt.Errorf("object %s must have 'gs' scheme", object)
+	}
+	if parsed.Host == "" {
+		return "", "", fmt.Errorf("object %s must have bucket", object)
+	}
+	if parsed.Path == "" {
+		return parsed.Host, "", nil
+	}
+
+	// remove leading "/" in URL path
+	return parsed.Host, parsed.Path[1:], nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/util/grpcx/dial.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/grpcx/dial.go b/sdks/go/pkg/beam/util/grpcx/dial.go
new file mode 100644
index 0000000..8467ace
--- /dev/null
+++ b/sdks/go/pkg/beam/util/grpcx/dial.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcx
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"google.golang.org/grpc"
+)
+
+// Dial is a convenience wrapper over grpc.Dial that specifies an insecure,
+// blocking connection with a timeout.
+func Dial(ctx context.Context, endpoint string, timeout time.Duration) (*grpc.ClientConn, error) {
+	ctx, cancel := context.WithTimeout(ctx, timeout)
+	defer cancel()
+
+	cc, err := grpc.DialContext(ctx, endpoint, grpc.WithInsecure(), grpc.WithBlock())
+	if err != nil {
+		return nil, fmt.Errorf("failed to dial server at %v: %v", endpoint, err)
+	}
+	return cc, nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/util/grpcx/metadata.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/grpcx/metadata.go b/sdks/go/pkg/beam/util/grpcx/metadata.go
new file mode 100644
index 0000000..08512c7
--- /dev/null
+++ b/sdks/go/pkg/beam/util/grpcx/metadata.go
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package grpcx contains utilities for working with gRPC.
+package grpcx
+
+import (
+	"context"
+	"errors"
+	"fmt"
+
+	"google.golang.org/grpc/metadata"
+)
+
+const idKey = "id"
+
+// ReadWorkerID reads the worker ID from an incoming gRPC request context.
+func ReadWorkerId(ctx context.Context) (string, error) {
+	md, ok := metadata.FromIncomingContext(ctx)
+	if !ok {
+		return "", errors.New("failed to read metadata from context")
+	}
+	id, ok := md[idKey]
+	if !ok || len(id) < 1 {
+		return "", fmt.Errorf("failed to find worker id in metadata %v", md)
+	}
+	if len(id) > 1 {
+		return "", fmt.Errorf("multiple worker ids in metadata: %v", id)
+	}
+	return id[0], nil
+}
+
+// WriteWorkerID write the worker ID to an outgoing gRPC request context. It
+// merges the information with any existing gRPC metadata.
+func WriteWorkerId(ctx context.Context, id string) context.Context {
+	md := metadata.New(map[string]string{
+		idKey: id,
+	})
+	if old, ok := metadata.FromOutgoingContext(ctx); ok {
+		md = metadata.Join(md, old)
+	}
+	return metadata.NewOutgoingContext(ctx, md)
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/go/pom.xml b/sdks/go/pom.xml
new file mode 100644
index 0000000..c072b9f
--- /dev/null
+++ b/sdks/go/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-go</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: SDKs :: Go</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-pkg-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.dir}/pkg</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>pkg</directory>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>cmd</directory>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- export pkg/ sources as zip for inclusion elsewhere -->
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>export-go-pkg-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <descriptors>
+            <descriptor>descriptor.xml</descriptor>
+          </descriptors>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>google.golang.org/api/storage/v1</package>
+                <package>github.com/spf13/cobra</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/beamctl</package>
+              </packages>
+              <resultName>beamctl</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/beamctl</package>
+              </packages>
+              <resultName>linux_amd64/beamctl</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <phase>test</phase>
+            <configuration>
+              <packages>
+                <folder>./...</folder>
+              </packages>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/java/container/Dockerfile
----------------------------------------------------------------------
diff --git a/sdks/java/container/Dockerfile b/sdks/java/container/Dockerfile
new file mode 100644
index 0000000..7fb325d
--- /dev/null
+++ b/sdks/java/container/Dockerfile
@@ -0,0 +1,28 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM openjdk:8
+MAINTAINER "Apache Beam <de...@beam.apache.org>"
+
+ADD target/slf4j-api.jar /opt/apache/beam/jars/
+ADD target/slf4j-jdk14.jar /opt/apache/beam/jars/
+ADD target/beam-sdks-java-harness.jar /opt/apache/beam/jars/
+
+ADD target/linux_amd64/boot /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/boot"]

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/java/container/boot.go
----------------------------------------------------------------------
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
new file mode 100644
index 0000000..8c465c3
--- /dev/null
+++ b/sdks/java/container/boot.go
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// boot is the boot code for the Java SDK harness container. It is responsible
+// for retrieving staged files and invoking the JVM correctly.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/provision"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+var (
+	// Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+	id                = flag.String("id", "", "Local identifier (required).")
+	loggingEndpoint   = flag.String("logging_endpoint", "", "Logging endpoint (required).")
+	artifactEndpoint  = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
+	provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
+	controlEndpoint   = flag.String("control_endpoint", "", "Control endpoint (required).")
+	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+func main() {
+	flag.Parse()
+	if *id == "" {
+		log.Fatal("No id provided.")
+	}
+	if *loggingEndpoint == "" {
+		log.Fatal("No logging endpoint provided.")
+	}
+	if *artifactEndpoint == "" {
+		log.Fatal("No artifact endpoint provided.")
+	}
+	if *provisionEndpoint == "" {
+		log.Fatal("No provision endpoint provided.")
+	}
+	if *controlEndpoint == "" {
+		log.Fatal("No control endpoint provided.")
+	}
+
+	log.Printf("Initializing java harness: %v", strings.Join(os.Args, " "))
+
+	ctx := grpcx.WriteWorkerId(context.Background(), *id)
+
+	// (1) Obtain the pipeline options
+
+	info, err := provision.Info(ctx, *provisionEndpoint)
+	if err != nil {
+		log.Fatalf("Failed to obtain provisioning information: %v", err)
+	}
+	options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+	if err != nil {
+		log.Fatalf("Failed to convert pipeline options: %v", err)
+	}
+
+	// (2) Retrieve the staged user jars.
+
+	dir := filepath.Join(*semiPersistDir, "staged")
+
+	artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, dir)
+	if err != nil {
+		log.Fatalf("Failed to retrieve staged files: %v", err)
+	}
+
+	// (3) Invoke the Java harness, preserving artifact ordering in classpath.
+
+	os.Setenv("PIPELINE_OPTIONS", options)
+	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", fmt.Sprintf("id: \"1\"\nurl: \"%v\"\n", *loggingEndpoint))
+	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", fmt.Sprintf("id: \"2\"\nurl: \"%v\"\n", *controlEndpoint))
+
+	const jarsDir = "/opt/apache/beam/jars"
+	cp := []string{
+		filepath.Join(jarsDir, "slf4j-api.jar"),
+		filepath.Join(jarsDir, "slf4j-jdk14.jar"),
+		filepath.Join(jarsDir, "beam-sdks-java-harness.jar"),
+	}
+	for _, md := range artifacts {
+		cp = append(cp, filepath.Join(dir, filepath.FromSlash(md.Name)))
+	}
+
+	args := []string{
+		"-cp", strings.Join(cp, ":"),
+		"org.apache.beam.fn.harness.FnHarness",
+	}
+	log.Printf("Executing: java %v", strings.Join(args, " "))
+
+	log.Fatalf("Java exited: %v", execx.Execute("java", args...))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/java/container/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/container/pom.xml b/sdks/java/container/pom.xml
new file mode 100644
index 0000000..dd970a4
--- /dev/null
+++ b/sdks/java/container/pom.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-container</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: SDKs :: Java :: Container</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/boot</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+
+          <execution>
+            <id>copy-dependent-jars</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.basedir}/target</outputDirectory>
+              <stripVersion>true</stripVersion>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-api</artifactId>
+                  <overWrite>true</overWrite>
+                </artifactItem>
+                <artifactItem>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-jdk14</artifactId>
+                  <version>${slf4j.version}</version>
+                  <overWrite>true</overWrite>
+                </artifactItem>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-java-harness</artifactId>
+                  <overWrite>true</overWrite>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>google.golang.org/api/storage/v1</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/boot</package>
+              </packages>
+              <resultName>boot</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/boot</package>
+              </packages>
+              <resultName>linux_amd64/boot</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/java</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index fe5c2f1..c8ac651 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -32,6 +32,71 @@
   <description>This contains the SDK Fn Harness for Beam Java</description>
 
   <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-shade-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>bundle-and-repackage</id>
+              <phase>package</phase>
+              <goals>
+                <goal>shade</goal>
+              </goals>
+              <configuration>
+                <shadeTestJar>true</shadeTestJar>
+                <artifactSet>
+                  <includes>
+                    <include>com.google.guava:guava</include>
+                    <!-- java harness dependencies that are not staged -->
+                    <include>org.apache.beam:beam-runners-core-construction-java</include>
+                    <include>org.apache.beam:beam-runners-core-java</include>
+                    <include>org.apache.beam:beam-runners-google-cloud-dataflow-java</include>
+                    <include>org.apache.beam:beam-sdks-common-runner-api</include>
+                    <include>org.apache.beam:beam-sdks-common-fn-api</include>
+                  </includes>
+                </artifactSet>
+                <filters>
+                  <filter>
+                    <artifact>*:*</artifact>
+                    <excludes>
+                      <exclude>META-INF/*.SF</exclude>
+                      <exclude>META-INF/*.DSA</exclude>
+                      <exclude>META-INF/*.RSA</exclude>
+                    </excludes>
+                  </filter>
+                </filters>
+                <relocations>
+                  <relocation>
+                    <pattern>com.google.common</pattern>
+                    <excludes>
+                      <!-- com.google.common is too generic, need to exclude guava-testlib -->
+                      <exclude>com.google.common.**.testing.*</exclude>
+                    </excludes>
+                    <!--suppress MavenModelInspection -->
+                    <shadedPattern>
+                      org.apache.beam.sdks.harness.repackaged.com.google.common
+                    </shadedPattern>
+                  </relocation>
+                  <relocation>
+                    <pattern>com.google.thirdparty</pattern>
+                    <!--suppress MavenModelInspection -->
+                    <shadedPattern>
+                      org.apache.beam.sdks.harness.repackaged.com.google.thirdparty
+                    </shadedPattern>
+                  </relocation>
+                </relocations>
+                <transformers>
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                </transformers>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
     <plugins>
       <plugin>
         <!--  Override Beam parent to allow Java8 -->
@@ -87,6 +152,7 @@
     </dependency>
 
     <dependency>
+      <!-- TODO: BEAM-2566 remove this dependency -->
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index f0cf8d9..12d09b3 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -54,6 +54,7 @@
       </activation>
       <modules>
         <module>harness</module>
+        <module>container</module>
         <module>java8tests</module>
       </modules>
     </profile>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/pom.xml b/sdks/pom.xml
index aec8762..0de5648 100644
--- a/sdks/pom.xml
+++ b/sdks/pom.xml
@@ -34,6 +34,7 @@
 
   <modules>
     <module>common</module>
+    <module>go</module>
     <module>java</module>
     <module>python</module>
   </modules>


[2/3] beam git commit: [BEAM-2877][BEAM-2881] Add Java SDK harness container image and support

Posted by tg...@apache.org.
[BEAM-2877][BEAM-2881] Add Java SDK harness container image and support

 * Add support for building Go code and docker container images with
   maven (see sdks/go/BUILD.md for details). The latter is only done
   if the "build-containers" profile is used.
 * Add GCS proxy service for managing artifacts in GCS.
 * Add GCE md service for metdata-configured provision info in GCE.
 * Add beamctl tool for manually interacting with these services.

This PR is focused on the execution side and would need support from
the submission side as well to be functional. The ULR will likely be
the first runner to tie everything together. The contents of the java
image is kept simple for now.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c18f15cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c18f15cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c18f15cd

Branch: refs/heads/master
Commit: c18f15cdf7eeabcaf64a2c808372683c3d823d4d
Parents: ed00299
Author: Henning Rohde <he...@google.com>
Authored: Sat Sep 30 22:36:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Oct 6 08:48:18 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  49 ++
 runners/gcp/gcemd/Dockerfile                    |  30 +
 runners/gcp/gcemd/main.go                       |  85 +++
 runners/gcp/gcemd/pom.xml                       | 154 ++++
 runners/gcp/gcsproxy/Dockerfile                 |  30 +
 runners/gcp/gcsproxy/main.go                    |  91 +++
 runners/gcp/gcsproxy/pom.xml                    | 154 ++++
 runners/gcp/pom.xml                             |  38 +
 runners/pom.xml                                 |   1 +
 .../src/main/proto/beam_provision_api.proto     |   8 +-
 .../src/main/proto/beam_artifact_api.proto      |  10 +
 sdks/go/BUILD.md                                |  63 ++
 sdks/go/cmd/beamctl/artifact.go                 |  98 +++
 sdks/go/cmd/beamctl/main.go                     |  64 ++
 sdks/go/descriptor.xml                          |  29 +
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 155 ++++
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go   | 200 +++++
 sdks/go/pkg/beam/artifact/materialize.go        | 240 ++++++
 sdks/go/pkg/beam/artifact/materialize_test.go   | 238 ++++++
 sdks/go/pkg/beam/artifact/server_test.go        | 212 ++++++
 sdks/go/pkg/beam/artifact/stage.go              | 238 ++++++
 sdks/go/pkg/beam/artifact/stage_test.go         |  98 +++
 sdks/go/pkg/beam/model/gen.go                   |  21 +
 .../beam_provision_api.pb.go                    | 219 ++++++
 .../beam_artifact_api.pb.go                     | 729 +++++++++++++++++++
 sdks/go/pkg/beam/provision/provision_test.go    |  54 ++
 sdks/go/pkg/beam/provision/provison.go          |  80 ++
 sdks/go/pkg/beam/util/errorx/guarded.go         |  47 ++
 sdks/go/pkg/beam/util/execx/exec.go             |  33 +
 sdks/go/pkg/beam/util/gcsx/gcs.go               |  88 +++
 sdks/go/pkg/beam/util/grpcx/dial.go             |  37 +
 sdks/go/pkg/beam/util/grpcx/metadata.go         |  55 ++
 sdks/go/pom.xml                                 | 163 +++++
 sdks/java/container/Dockerfile                  |  28 +
 sdks/java/container/boot.go                     | 111 +++
 sdks/java/container/pom.xml                     | 184 +++++
 sdks/java/harness/pom.xml                       |  66 ++
 sdks/java/pom.xml                               |   1 +
 sdks/pom.xml                                    |   1 +
 39 files changed, 4201 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d9c2e6d..42671e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,9 @@
     <kubectl>kubectl</kubectl>
     <!-- the standard location for kubernete's config file -->
     <kubeconfig>${user.home}/.kube/config</kubeconfig>
+
+    <!-- For container builds, override to push containers to a registry -->
+    <docker-repository-root>${user.name}</docker-repository-root>
   </properties>
 
   <packaging>pom</packaging>
@@ -364,6 +367,35 @@
         </pluginManagement>
       </build>
     </profile>
+
+    <profile>
+      <id>build-containers</id>
+      <build>
+        <!-- TODO(BEAM-2878): enable container build for releases -->
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>com.spotify</groupId>
+              <artifactId>dockerfile-maven-plugin</artifactId>
+              <executions>
+                <execution>
+                  <id>default</id>
+                  <goals>
+                    <goal>build</goal>
+                    <goal>push</goal>
+                  </goals>
+                  <configuration>
+                    <tag>latest</tag>
+                    <noCache>true</noCache>
+                  </configuration>
+                </execution>
+              </executions>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
+
   </profiles>
 
   <dependencyManagement>
@@ -1855,6 +1887,23 @@
             </execution>
           </executions>
         </plugin>
+
+        <plugin>
+          <groupId>com.igormaznitsa</groupId>
+          <artifactId>mvn-golang-wrapper</artifactId>
+          <version>2.1.6</version>
+          <extensions>true</extensions>
+          <configuration>
+            <goVersion>1.9</goVersion>
+          </configuration>
+        </plugin>
+
+        <plugin>
+          <groupId>com.spotify</groupId>
+          <artifactId>dockerfile-maven-plugin</artifactId>
+          <version>1.3.5</version>
+          <!-- no executions by default. Use build-containers profile -->
+        </plugin>
       </plugins>
     </pluginManagement>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/Dockerfile
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/Dockerfile b/runners/gcp/gcemd/Dockerfile
new file mode 100644
index 0000000..b8fa8aa
--- /dev/null
+++ b/runners/gcp/gcemd/Dockerfile
@@ -0,0 +1,30 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM debian:stretch
+MAINTAINER "Apache Beam <de...@beam.apache.org>"
+
+RUN apt-get update && \
+    DEBIAN_FRONTEND=noninteractive apt-get install -y \
+        ca-certificates \
+        && \
+    rm -rf /var/lib/apt/lists/*
+
+ADD target/linux_amd64/gcemd /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/gcemd"]

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/main.go
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/main.go b/runners/gcp/gcemd/main.go
new file mode 100644
index 0000000..66b049e
--- /dev/null
+++ b/runners/gcp/gcemd/main.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// gcemd is a metadata-configured provisioning server for GCE.
+package main
+
+import (
+	"flag"
+	"log"
+	"net"
+
+	"cloud.google.com/go/compute/metadata"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_fn_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/provision"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+var (
+	endpoint = flag.String("endpoint", "", "Server endpoint to expose.")
+)
+
+func main() {
+	flag.Parse()
+	if *endpoint == "" {
+		log.Fatal("No endpoint provided. Use --endpoint=localhost:12345")
+	}
+	if !metadata.OnGCE() {
+		log.Fatal("Not running on GCE")
+	}
+
+	log.Printf("Starting provisioning server on %v", *endpoint)
+
+	jobID, err := metadata.InstanceAttributeValue("job_id")
+	if err != nil {
+		log.Fatalf("Failed to find job ID: %v", err)
+	}
+	jobName, err := metadata.InstanceAttributeValue("job_name")
+	if err != nil {
+		log.Fatalf("Failed to find job name: %v", err)
+	}
+	opt, err := metadata.InstanceAttributeValue("sdk_pipeline_options")
+	if err != nil {
+		log.Fatalf("Failed to find SDK pipeline options: %v", err)
+	}
+	options, err := provision.JSONToProto(opt)
+	if err != nil {
+		log.Fatalf("Failed to parse SDK pipeline options: %v", err)
+	}
+
+	info := &pb.ProvisionInfo{
+		JobId:           jobID,
+		JobName:         jobName,
+		PipelineOptions: options,
+	}
+
+	gs := grpc.NewServer()
+	pb.RegisterProvisionServiceServer(gs, &server{info: info})
+
+	listener, err := net.Listen("tcp", *endpoint)
+	if err != nil {
+		log.Fatalf("Failed to listen to %v: %v", *endpoint, err)
+	}
+	log.Fatalf("Server failed: %v", gs.Serve(listener))
+}
+
+type server struct {
+	info *pb.ProvisionInfo
+}
+
+func (s *server) GetProvisionInfo(ctx context.Context, req *pb.GetProvisionInfoRequest) (*pb.GetProvisionInfoResponse, error) {
+	return &pb.GetProvisionInfoResponse{Info: s.info}, nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/pom.xml b/runners/gcp/gcemd/pom.xml
new file mode 100644
index 0000000..377e3e0
--- /dev/null
+++ b/runners/gcp/gcemd/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-gcp-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-gcemd</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform :: GCE metadata provisioning</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/gcemd</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>cloud.google.com/go/compute/metadata</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcemd</package>
+              </packages>
+              <resultName>gcemd</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcemd</package>
+              </packages>
+              <resultName>linux_amd64/gcemd</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/gcemd</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/Dockerfile
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/Dockerfile b/runners/gcp/gcsproxy/Dockerfile
new file mode 100644
index 0000000..5ff9141
--- /dev/null
+++ b/runners/gcp/gcsproxy/Dockerfile
@@ -0,0 +1,30 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM debian:stretch
+MAINTAINER "Apache Beam <de...@beam.apache.org>"
+
+RUN apt-get update && \
+    DEBIAN_FRONTEND=noninteractive apt-get install -y \
+        ca-certificates \
+        && \
+    rm -rf /var/lib/apt/lists/*
+
+ADD target/linux_amd64/gcsproxy /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/gcsproxy"]

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/main.go
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/main.go b/runners/gcp/gcsproxy/main.go
new file mode 100644
index 0000000..ec63032
--- /dev/null
+++ b/runners/gcp/gcsproxy/main.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// gcsproxy is an artifact server backed by GCS and can run in either retrieval
+// (read) or staging (write) mode.
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"net"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact/gcsproxy"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"google.golang.org/grpc"
+)
+
+const (
+	retrieve = "retrieve"
+	stage    = "stage"
+)
+
+var (
+	mode     = flag.String("mode", retrieve, "Proxy mode: retrieve or stage.")
+	endpoint = flag.String("endpoint", "", "Server endpoint to expose.")
+	manifest = flag.String("manifest", "", "Location of proxy manifest.")
+)
+
+func main() {
+	flag.Parse()
+	if *manifest == "" {
+		log.Fatal("No proxy manifest location provided. Use --manifest=gs://foo/bar")
+	}
+	if *endpoint == "" {
+		log.Fatal("No endpoint provided. Use --endpoint=localhost:12345")
+	}
+
+	gs := grpc.NewServer()
+
+	switch *mode {
+	case retrieve:
+		// Retrieval mode. We download the manifest -- but not the
+		// artifacts -- eagerly.
+
+		log.Printf("Starting retrieval proxy from %v on %v", *manifest, *endpoint)
+
+		md, err := gcsproxy.ReadProxyManifest(context.Background(), *manifest)
+		if err != nil {
+			log.Fatalf("Failed to obtain proxy manifest %v: %v", *manifest, err)
+		}
+		proxy, err := gcsproxy.NewRetrievalServer(md)
+		if err != nil {
+			log.Fatalf("Failed to create artifact server: %v", err)
+		}
+		pb.RegisterArtifactRetrievalServiceServer(gs, proxy)
+
+	case stage:
+		// Staging proxy. We update the blobs next to the manifest
+		// in a blobs "directory".
+
+		log.Printf("Starting staging proxy to %v on %v", *manifest, *endpoint)
+
+		proxy, err := gcsproxy.NewStagingServer(*manifest)
+		if err != nil {
+			log.Fatalf("Failed to create artifact server: %v", err)
+		}
+		pb.RegisterArtifactStagingServiceServer(gs, proxy)
+
+	default:
+		log.Fatalf("Invalid mode: '%v', want '%v' or '%v'", *mode, retrieve, stage)
+	}
+
+	listener, err := net.Listen("tcp", *endpoint)
+	if err != nil {
+		log.Fatalf("Failed to listen to %v: %v", *endpoint, err)
+	}
+	log.Fatalf("Server failed: %v", gs.Serve(listener))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/pom.xml b/runners/gcp/gcsproxy/pom.xml
new file mode 100644
index 0000000..35be16e
--- /dev/null
+++ b/runners/gcp/gcsproxy/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-gcp-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-gcsproxy</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform :: GCS artifact proxy</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/gcsproxy</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>google.golang.org/api/storage/v1</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcsproxy</package>
+              </packages>
+              <resultName>gcsproxy</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcsproxy</package>
+              </packages>
+              <resultName>linux_amd64/gcsproxy</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/gcsproxy</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/pom.xml b/runners/gcp/pom.xml
new file mode 100644
index 0000000..d900212
--- /dev/null
+++ b/runners/gcp/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-parent</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform</name>
+
+  <modules>
+    <module>gcemd</module>
+    <module>gcsproxy</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index e0a47bd..a9c33d7 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -42,6 +42,7 @@
     <module>google-cloud-dataflow-java</module>
     <module>spark</module>
     <module>apex</module>
+    <module>gcp</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
index fb4f252..b0cd6b4 100644
--- a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
@@ -31,7 +31,7 @@ option java_outer_classname = "ProvisionApi";
 import "google/protobuf/struct.proto";
 
 // A service to provide runtime provisioning information to the SDK harness
-// worker instances -- such as pipeline options, resource constaints and
+// worker instances -- such as pipeline options, resource constraints and
 // other job metadata -- needed by an SDK harness instance to initialize.
 service ProvisionService {
     // Get provision information for the SDK harness worker instance.
@@ -43,6 +43,12 @@ message GetProvisionInfoRequest { }
 
 // A response containing the provision info of a SDK harness worker instance.
 message GetProvisionInfoResponse {
+    ProvisionInfo info = 1;
+}
+
+// Runtime provisioning information for a SDK harness worker instance,
+// such as pipeline options, resource constraints and other job metadata
+message ProvisionInfo {
     // (required) The job ID.
     string job_id = 1;
     // (required) The job name.

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
index 12b0217..e11551c 100644
--- a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
@@ -72,6 +72,16 @@ message Manifest {
   repeated ArtifactMetadata artifact = 1;
 }
 
+// A manifest with location information.
+message ProxyManifest {
+  Manifest manifest = 1;
+  message Location {
+     string name = 1;
+     string uri = 2;
+  }
+  repeated Location location = 2;
+}
+
 // A request to get the manifest of a Job.
 message GetManifestRequest {}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/BUILD.md
----------------------------------------------------------------------
diff --git a/sdks/go/BUILD.md b/sdks/go/BUILD.md
new file mode 100644
index 0000000..1bbfdf0
--- /dev/null
+++ b/sdks/go/BUILD.md
@@ -0,0 +1,63 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Go build
+
+This document describes the [Go](golang.org) code layout and build integration
+with Maven. The setup is non-trivial, because the Go toolchain expects a
+certain layout and Maven support is limited.
+
+Goals:
+
+ 1. Go code can be built and tested using Maven w/o special requirements.
+ 1. Go tools such as `go build`, `go test` and `go generate` work as usual.
+ 1. Go code can be pulled with `go get` from `github.com/apache/beam` for users.
+ 1. Go programs can used in docker container images.
+
+In short, the goals are to make both worlds work well.
+
+### Maven integration
+
+The Go toolchain expects the package name to match the directory structure,
+which in turn must be rooted in `github.com/apache/beam` for `go get` to work.
+This directory prefix is beyond the repo itself and we must copy the Go source
+code into such a layout to invoke the tool chain. We use a single directory
+`sdks/go` for all shared library code and export it as a zip file during the 
+build process to be used by various tools, such as `sdks/java/container`.
+This scheme balances the convenience of combined Go setup with the desire
+for a unified layout across languages. Python seems to do the same.
+
+The container build adds a small twist to the build integration, because
+container images use linux/amd64 but the development setup might not. We
+therefore additionally cross-compile Go binaries for inclusion into container
+images where needed, generally placed in `target/linux_amd64`.
+
+### Go development setup
+
+Developers must clone their git repository into:
+```
+$GOPATH/src/github.com/apache
+
+```
+to match the package structure expected by the code imports. Go users can just
+`go get` the code directly. For example:
+```
+go get github.com/apache/beam/sdks/go/...
+```
+Developers must invoke Go for cross-compilation manually, if desired.

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/cmd/beamctl/artifact.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/artifact.go b/sdks/go/cmd/beamctl/artifact.go
new file mode 100644
index 0000000..d8c2c37
--- /dev/null
+++ b/sdks/go/cmd/beamctl/artifact.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"path/filepath"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/spf13/cobra"
+)
+
+var (
+	artifactCmd = &cobra.Command{
+		Use:   "artifact",
+		Short: "Artifact commands",
+	}
+
+	stageCmd = &cobra.Command{
+		Use:   "stage",
+		Short: "Stage local files as artifacts",
+		RunE:  stageFn,
+		Args:  cobra.MinimumNArgs(1),
+	}
+
+	listCmd = &cobra.Command{
+		Use:   "list",
+		Short: "List artifacts",
+		RunE:  listFn,
+		Args:  cobra.NoArgs,
+	}
+)
+
+func init() {
+	artifactCmd.AddCommand(stageCmd, listCmd)
+}
+
+func stageFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	// (1) Use flat filename as key.
+
+	var files []artifact.KeyedFile
+	for _, arg := range args {
+		files = append(files, artifact.KeyedFile{Key: filepath.Base(arg), Filename: arg})
+	}
+
+	// (2) Stage files in parallel, commit and print out token
+
+	client := pb.NewArtifactStagingServiceClient(cc)
+	list, err := artifact.MultiStage(ctx, client, 10, files)
+	if err != nil {
+		return err
+	}
+	token, err := artifact.Commit(ctx, client, list)
+	if err != nil {
+		return err
+	}
+
+	cmd.Println(token)
+	return nil
+}
+
+func listFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	md, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
+	if err != nil {
+		return err
+	}
+
+	for _, a := range md.GetManifest().GetArtifact() {
+		cmd.Println(a.Name)
+	}
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/cmd/beamctl/main.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/main.go b/sdks/go/cmd/beamctl/main.go
new file mode 100644
index 0000000..9ce47a7
--- /dev/null
+++ b/sdks/go/cmd/beamctl/main.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// beamctl is a command line client for the Apache Beam portability services.
+package main
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/spf13/cobra"
+	"google.golang.org/grpc"
+)
+
+var (
+	rootCmd = &cobra.Command{
+		Use:   "beamctl",
+		Short: "Apache Beam command line client",
+	}
+
+	id       string
+	endpoint string
+)
+
+func init() {
+	rootCmd.AddCommand(artifactCmd)
+	rootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "Server endpoint, such as localhost:123")
+	rootCmd.PersistentFlags().StringVarP(&id, "id", "i", "", "Client ID")
+}
+
+func main() {
+	if err := rootCmd.Execute(); err != nil {
+		fmt.Println(err)
+		os.Exit(1)
+	}
+}
+
+// dial connects via gRPC to the given endpoint and returns the connection
+// and the context to use.
+func dial() (context.Context, *grpc.ClientConn, error) {
+	if endpoint == "" {
+		return nil, nil, errors.New("endpoint not defined")
+	}
+
+	ctx := grpcx.WriteWorkerId(context.Background(), id)
+	cc, err := grpcx.Dial(ctx, endpoint, time.Minute)
+	return ctx, cc, err
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/descriptor.xml
----------------------------------------------------------------------
diff --git a/sdks/go/descriptor.xml b/sdks/go/descriptor.xml
new file mode 100644
index 0000000..15ec4e8
--- /dev/null
+++ b/sdks/go/descriptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<assembly>
+    <id>pkg-sources</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>pkg</directory>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
new file mode 100644
index 0000000..7a11568
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package gcsproxy
+
+import (
+	"fmt"
+	"io"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/net/context"
+	"google.golang.org/api/storage/v1"
+)
+
+// RetrievalServer is a artifact retrieval server backed by Google
+// Cloud Storage (GCS). It serves a single manifest and ignores
+// the worker id. The server performs no caching or pre-fetching.
+type RetrievalServer struct {
+	md    *pb.Manifest
+	blobs map[string]string
+}
+
+// ReadProxyManifest reads and parses the proxy manifest from GCS.
+func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
+	bucket, obj, err := gcsx.ParseObject(object)
+	if err != nil {
+		return nil, fmt.Errorf("invalid manifest object %v: %v", object, err)
+	}
+
+	cl, err := gcsx.NewClient(ctx, storage.DevstorageReadOnlyScope)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+	}
+	content, err := gcsx.ReadObject(cl, bucket, obj)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read manifest %v: %v", object, err)
+	}
+	var md pb.ProxyManifest
+	if err := proto.Unmarshal(content, &md); err != nil {
+		return nil, fmt.Errorf("invalid manifest %v: %v", object, err)
+	}
+	return &md, nil
+}
+
+// NewRetrievalServer creates a artifact retrieval server for the
+// given manifest. It requires that the locations are in GCS.
+func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
+	if err := validate(md); err != nil {
+		return nil, err
+	}
+
+	blobs := make(map[string]string)
+	for _, l := range md.GetLocation() {
+		if _, _, err := gcsx.ParseObject(l.GetUri()); err != nil {
+			return nil, fmt.Errorf("location %v is not a GCS object: %v", l.GetUri(), err)
+		}
+		blobs[l.GetName()] = l.GetUri()
+	}
+	return &RetrievalServer{md: md.GetManifest(), blobs: blobs}, nil
+}
+
+// GetManifest returns the manifest for all artifacts.
+func (s *RetrievalServer) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
+	return &pb.GetManifestResponse{Manifest: s.md}, nil
+}
+
+// GetArtifact returns a given artifact.
+func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
+	key := req.GetName()
+	blob, ok := s.blobs[key]
+	if !ok {
+		return fmt.Errorf("artifact %v not found", key)
+	}
+
+	bucket, object := parseObject(blob)
+
+	client, err := gcsx.NewClient(stream.Context(), storage.DevstorageReadOnlyScope)
+	if err != nil {
+		return fmt.Errorf("Failed to create client for %v: %v", key, err)
+	}
+
+	// Stream artifact in up to 1MB chunks.
+
+	resp, err := client.Objects.Get(bucket, object).Download()
+	if err != nil {
+		return fmt.Errorf("Failed to read object for %v: %v", key, err)
+	}
+	defer resp.Body.Close()
+
+	data := make([]byte, 1<<20)
+	for {
+		n, err := resp.Body.Read(data)
+		if n > 0 {
+			if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
+				return fmt.Errorf("chunk send failed: %v", err)
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return fmt.Errorf("failed to read from %v: %v", blob, err)
+		}
+	}
+	return nil
+}
+
+func validate(md *pb.ProxyManifest) error {
+	keys := make(map[string]bool)
+	for _, a := range md.GetManifest().GetArtifact() {
+		if _, seen := keys[a.Name]; seen {
+			return fmt.Errorf("multiple artifact with name %v", a.Name)
+		}
+		keys[a.Name] = true
+	}
+	for _, l := range md.GetLocation() {
+		fresh, seen := keys[l.Name]
+		if !seen {
+			return fmt.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
+		}
+		if !fresh {
+			return fmt.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
+		}
+		keys[l.Name] = false
+	}
+
+	for key, fresh := range keys {
+		if fresh {
+			return fmt.Errorf("no location for %v", key)
+		}
+	}
+	return nil
+}
+
+func parseObject(blob string) (string, string) {
+	bucket, object, err := gcsx.ParseObject(blob)
+	if err != nil {
+		panic(err)
+	}
+	return bucket, object
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
new file mode 100644
index 0000000..3c67b1a
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package gcsproxy contains artifact staging and retrieval servers backed by GCS.
+package gcsproxy
+
+import (
+	"bytes"
+	"crypto/md5"
+	"encoding/base64"
+	"errors"
+	"fmt"
+	"hash"
+	"path"
+	"sync"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/net/context"
+	"google.golang.org/api/storage/v1"
+)
+
+// StaginServer is a artifact staging server backed by Google Cloud Storage
+// (GCS). It commits a single manifest and ignores the staging id.
+type StagingServer struct {
+	manifest     string
+	bucket, root string
+	blobs        map[string]staged // guarded by mu
+	mu           sync.Mutex
+}
+
+type staged struct {
+	object, hash string
+}
+
+// NewStagingServer creates a artifact staging server for the given manifest.
+// It requires that the manifest is in GCS and will stage the supplied
+// artifacts next to it.
+func NewStagingServer(manifest string) (*StagingServer, error) {
+	bucket, object, err := gcsx.ParseObject(manifest)
+	if err != nil {
+		return nil, fmt.Errorf("invalid manifest location: %v", err)
+	}
+	root := path.Join(path.Dir(object), "blobs")
+
+	return &StagingServer{
+		manifest: object,
+		bucket:   bucket,
+		root:     root,
+		blobs:    make(map[string]staged),
+	}, nil
+}
+
+// CommitManifest commits the given artifact manifest to GCS.
+func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+	manifest := req.GetManifest()
+
+	s.mu.Lock()
+	loc, err := matchLocations(manifest.GetArtifact(), s.blobs)
+	if err != nil {
+		s.mu.Unlock()
+		return nil, err
+	}
+	s.mu.Unlock()
+
+	data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal proxy manifest: %v", err)
+	}
+
+	cl, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+	}
+	if err := gcsx.WriteObject(cl, s.bucket, s.manifest, bytes.NewReader(data)); err != nil {
+		return nil, fmt.Errorf("failed to write manifest: %v", err)
+	}
+
+	// Commit returns the location of the manifest as the token, which can
+	// then be used to configure the retrieval proxy. It is redundant right
+	// now, but would be needed for a staging server that serves multiple
+	// jobs. Such a server would also use the ID sent with each request.
+
+	return &pb.CommitManifestResponse{StagingToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
+}
+
+// matchLocations ensures that all artifacts have been staged and have valid
+// content. It is fine for staged artifacts to not appear in the manifest.
+func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) ([]*pb.ProxyManifest_Location, error) {
+	var loc []*pb.ProxyManifest_Location
+	for _, a := range artifacts {
+		info, ok := blobs[a.Name]
+		if !ok {
+			return nil, fmt.Errorf("artifact %v not staged", a.Name)
+		}
+		if a.Md5 == "" {
+			a.Md5 = info.hash
+		}
+		if info.hash != a.Md5 {
+			return nil, fmt.Errorf("staged artifact for %v has invalid MD5: %v, want %v", a.Name, info.hash, a.Md5)
+		}
+
+		loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
+	}
+	return loc, nil
+}
+
+// PutArtifact stores the given artifact in GCS.
+func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) error {
+	// Read header
+
+	header, err := ps.Recv()
+	if err != nil {
+		return fmt.Errorf("failed to receive header: %v", err)
+	}
+	md := header.GetMetadata()
+	if md == nil {
+		return fmt.Errorf("expected header as first message: %v", header)
+	}
+	object := path.Join(s.root, md.Name)
+
+	// Stream content to GCS. We don't have to worry about partial
+	// or abandoned writes, because object writes are atomic.
+
+	cl, err := gcsx.NewClient(ps.Context(), storage.DevstorageReadWriteScope)
+	if err != nil {
+		return fmt.Errorf("failed to create GCS client: %v", err)
+	}
+
+	r := &reader{md5W: md5.New(), stream: ps}
+	if err := gcsx.WriteObject(cl, s.bucket, object, r); err != nil {
+		return fmt.Errorf("failed to stage artifact %v: %v", md.Name, err)
+	}
+	hash := r.MD5()
+	if md.Md5 != "" && md.Md5 != hash {
+		return fmt.Errorf("invalid MD5 for artifact %v: %v want %v", md.Name, hash, md.Md5)
+	}
+
+	s.mu.Lock()
+	s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), hash: hash}
+	s.mu.Unlock()
+
+	return ps.SendAndClose(&pb.PutArtifactResponse{})
+}
+
+// reader is an adapter between the artifact stream and the GCS stream reader.
+// It also computes the MD5 of the content.
+type reader struct {
+	md5W   hash.Hash
+	buf    []byte
+	stream pb.ArtifactStagingService_PutArtifactServer
+}
+
+func (r *reader) Read(buf []byte) (int, error) {
+	if len(r.buf) == 0 {
+		// Buffer empty. Read from upload stream.
+
+		msg, err := r.stream.Recv()
+		if err != nil {
+			return 0, err // EOF or real error
+		}
+
+		r.buf = msg.GetData().GetData()
+		if len(r.buf) == 0 {
+			return 0, errors.New("empty chunk")
+		}
+	}
+
+	// Copy out bytes from non-empty buffer.
+
+	n := len(r.buf)
+	if n > len(buf) {
+		n = len(buf)
+	}
+	for i := 0; i < n; i++ {
+		buf[i] = r.buf[i]
+	}
+	if _, err := r.md5W.Write(r.buf[:n]); err != nil {
+		panic(err) // cannot fail
+	}
+	r.buf = r.buf[n:]
+	return n, nil
+}
+
+func (r *reader) MD5() string {
+	return base64.StdEncoding.EncodeToString(r.md5W.Sum(nil))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/materialize.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go
new file mode 100644
index 0000000..93bed65
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package artifact contains utilities for staging and retrieving artifacts.
+package artifact
+
+import (
+	"bufio"
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"fmt"
+	"io"
+	"math/rand"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+// Materialize is a convenience helper for ensuring that all artifacts are
+// present and uncorrupted. It interprets each artifact name as a relative
+// path under the dest directory. It does not retrieve valid artifacts already
+// present.
+func Materialize(ctx context.Context, endpoint string, dest string) ([]*pb.ArtifactMetadata, error) {
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return nil, err
+	}
+	defer cc.Close()
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+
+	m, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
+	if err != nil {
+		return nil, fmt.Errorf("failed to get manifest: %v", err)
+	}
+	md := m.GetManifest().GetArtifact()
+	return md, MultiRetrieve(ctx, client, 10, md, dest)
+}
+
+// MultiRetrieve retrieves multiple artifacts concurrently, using at most 'cpus'
+// goroutines. It retries each artifact a few times. Convenience wrapper.
+func MultiRetrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, cpus int, list []*pb.ArtifactMetadata, dest string) error {
+	if len(list) == 0 {
+		return nil
+	}
+	if cpus < 1 {
+		cpus = 1
+	}
+	if len(list) < cpus {
+		cpus = len(list)
+	}
+
+	q := slice2queue(list)
+	var permErr errorx.GuardedError
+
+	var wg sync.WaitGroup
+	for i := 0; i < cpus; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for a := range q {
+				if permErr.Error() != nil {
+					continue
+				}
+
+				const attempts = 3
+
+				var failures []string
+				for {
+					err := Retrieve(ctx, client, a, dest)
+					if err == nil || permErr.Error() != nil {
+						break // done or give up
+					}
+					failures = append(failures, err.Error())
+					if len(failures) > attempts {
+						permErr.TrySetError(fmt.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
+						break // give up
+					}
+					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+
+	return permErr.Error()
+}
+
+// Retrieve checks whether the given artifact is already successfully
+// retrieved. If not, it retrieves into the dest directory. It overwrites any
+// previous retrieval attempt and may leave a corrupt/partial local file on
+// failure.
+func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, dest string) error {
+	filename := filepath.Join(dest, filepath.FromSlash(a.Name))
+
+	_, err := os.Stat(filename)
+	if err != nil && !os.IsNotExist(err) {
+		return fmt.Errorf("failed to stat %v: %v", filename, err)
+	}
+	if err == nil {
+		// File already exists. Validate or delete.
+
+		hash, err := computeMD5(filename)
+		if err == nil && a.Md5 == hash {
+			// NOTE(herohde) 10/5/2017: We ignore permissions here, because
+			// they may differ from the requested permissions due to umask
+			// settings on unix systems (which we in turn want to respect).
+			// We have no good way to know what to expect and thus assume
+			// any permissions are fine.
+			return nil
+		}
+
+		if err2 := os.Remove(filename); err2 != nil {
+			return fmt.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
+		} // else: successfully deleted bad file.
+	} // else: file does not exist.
+
+	if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
+		return err
+	}
+	return retrieve(ctx, client, a, filename)
+}
+
+// retrieve retrieves the given artifact and stores it as the given filename.
+// It validates that the given MD5 matches the content and fails otherwise.
+// It expects the file to not exist, but does not clean up on failure and
+// may leave a corrupt file.
+func retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, filename string) error {
+	stream, err := client.GetArtifact(ctx, &pb.GetArtifactRequest{Name: a.Name})
+	if err != nil {
+		return err
+	}
+
+	fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(a.Permissions))
+	if err != nil {
+		return err
+	}
+	w := bufio.NewWriter(fd)
+
+	hash, err := retrieveChunks(stream, w)
+	if err != nil {
+		fd.Close() // drop any buffered content
+		return fmt.Errorf("failed to retrieve chunk for %v: %v", filename, err)
+	}
+	if err := w.Flush(); err != nil {
+		fd.Close()
+		return fmt.Errorf("failed to flush chunks for %v: %v", filename, err)
+	}
+	if err := fd.Close(); err != nil {
+		return err
+	}
+
+	if hash != a.Md5 {
+		return fmt.Errorf("bad MD5 for %v: %v, want %v", filename, hash, a.Md5)
+	}
+	return nil
+}
+
+func retrieveChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
+	md5W := md5.New()
+	for {
+		chunk, err := stream.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+
+		if _, err := md5W.Write(chunk.Data); err != nil {
+			panic(err) // cannot fail
+		}
+		if _, err := w.Write(chunk.Data); err != nil {
+			return "", fmt.Errorf("chunk write failed: %v", err)
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+func computeMD5(filename string) (string, error) {
+	fd, err := os.Open(filename)
+	if err != nil {
+		return "", err
+	}
+	defer fd.Close()
+
+	md5W := md5.New()
+	data := make([]byte, 1<<20)
+	for {
+		n, err := fd.Read(data)
+		if n > 0 {
+			if _, err := md5W.Write(data[:n]); err != nil {
+				panic(err) // cannot fail
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+func slice2queue(list []*pb.ArtifactMetadata) chan *pb.ArtifactMetadata {
+	q := make(chan *pb.ArtifactMetadata, len(list))
+	for _, elm := range list {
+		q <- elm
+	}
+	close(q)
+	return q
+}
+
+func queue2slice(q chan *pb.ArtifactMetadata) []*pb.ArtifactMetadata {
+	var ret []*pb.ArtifactMetadata
+	for elm := range q {
+		ret = append(ret, elm)
+	}
+	return ret
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/materialize_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go
new file mode 100644
index 0000000..5d35512
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
+)
+
+// TestRetrieve tests that we can successfully retrieve fresh files.
+func TestRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	keys := []string{"foo", "bar", "baz/baz/baz"}
+	artifacts := populate(ctx, cc, t, keys, 300)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	for _, a := range artifacts {
+		filename := makeFilename(dst, a.Name)
+		if err := Retrieve(ctx, client, a, dst); err != nil {
+			t.Errorf("failed to retrieve %v: %v", a.Name, err)
+			continue
+		}
+		verifyMD5(t, filename, a.Md5)
+	}
+}
+
+// TestMultiRetrieve tests that we can successfully retrieve fresh files
+// concurrently.
+func TestMultiRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
+	artifacts := populate(ctx, cc, t, keys, 300)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	if err := MultiRetrieve(ctx, client, 10, artifacts, dst); err != nil {
+		t.Errorf("failed to retrieve: %v", err)
+	}
+
+	for _, a := range artifacts {
+		verifyMD5(t, makeFilename(dst, a.Name), a.Md5)
+	}
+}
+
+// TestDirtyRetrieve tests that we can successfully retrieve files in a
+// dirty setup with correct and incorrect pre-existing files.
+func TestDirtyRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idC")
+	scl := pb.NewArtifactStagingServiceClient(cc)
+
+	list := []*pb.ArtifactMetadata{
+		stage(ctx, scl, t, "good", 500, 100),
+		stage(ctx, scl, t, "bad", 500, 100),
+	}
+	if _, err := Commit(ctx, scl, list); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	// Kill good file in server by re-staging conflicting content. That ensures
+	// we don't retrieve it.
+	stage(ctx, scl, t, "good", 100, 100)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	good := filepath.Join(dst, "good")
+	bad := filepath.Join(dst, "bad")
+
+	makeTempFile(t, good, 500) // correct content. Do nothing.
+	makeTempFile(t, bad, 367)  // invalid content. Delete and retrieve.
+
+	rcl := pb.NewArtifactRetrievalServiceClient(cc)
+	if err := MultiRetrieve(ctx, rcl, 2, list, dst); err != nil {
+		t.Fatalf("failed to get retrieve: %v", err)
+	}
+
+	verifyMD5(t, good, list[0].Md5)
+	verifyMD5(t, bad, list[1].Md5)
+}
+
+// populate stages a set of artifacts with the given keys, each with
+// slightly different sizes and chucksizes.
+func populate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys []string, size int) []*pb.ArtifactMetadata {
+	scl := pb.NewArtifactStagingServiceClient(cc)
+
+	var artifacts []*pb.ArtifactMetadata
+	for i, key := range keys {
+		a := stage(ctx, scl, t, key, size+7*i, 97+i)
+		artifacts = append(artifacts, a)
+	}
+	if _, err := Commit(ctx, scl, artifacts); err != nil {
+		t.Fatalf("failed to commit manifest: %v", err)
+	}
+	return artifacts
+}
+
+// stage stages an artifact with the given key, size and chuck size. The content is
+// always 'z's.
+func stage(ctx context.Context, scl pb.ArtifactStagingServiceClient, t *testing.T, key string, size, chunkSize int) *pb.ArtifactMetadata {
+	data := make([]byte, size)
+	for i := 0; i < size; i++ {
+		data[i] = 'z'
+	}
+
+	md5W := md5.New()
+	md5W.Write(data)
+	hash := base64.StdEncoding.EncodeToString(md5W.Sum(nil))
+	md := makeArtifact(key, hash)
+
+	stream, err := scl.PutArtifact(ctx)
+	if err != nil {
+		t.Fatalf("put failed: %v", err)
+	}
+	header := &pb.PutArtifactRequest{
+		Content: &pb.PutArtifactRequest_Metadata{
+			Metadata: md,
+		},
+	}
+	if err := stream.Send(header); err != nil {
+		t.Fatalf("send header failed: %v", err)
+	}
+
+	for i := 0; i < size; i += chunkSize {
+		end := i + chunkSize
+		if size < end {
+			end = size
+		}
+
+		chunk := &pb.PutArtifactRequest{
+			Content: &pb.PutArtifactRequest_Data{
+				Data: &pb.ArtifactChunk{
+					Data: data[i:end],
+				},
+			},
+		}
+		if err := stream.Send(chunk); err != nil {
+			t.Fatalf("send chunk[%v:%v] failed: %v", i, end, err)
+		}
+	}
+	if _, err := stream.CloseAndRecv(); err != nil {
+		t.Fatalf("close failed: %v", err)
+	}
+	return md
+}
+
+func verifyMD5(t *testing.T, filename, hash string) {
+	actual, err := computeMD5(filename)
+	if err != nil {
+		t.Errorf("failed to compute hash for %v: %v", filename, err)
+		return
+	}
+	if actual != hash {
+		t.Errorf("file %v has bad MD5: %v, want %v", filename, actual, hash)
+	}
+}
+
+func makeTempDir(t *testing.T) string {
+	dir, err := ioutil.TempDir("", "artifact_test_")
+	if err != nil {
+		t.Errorf("Test failure: cannot create temporary directory: %+v", err)
+	}
+	return dir
+}
+
+func makeTempFiles(t *testing.T, dir string, keys []string, size int) []string {
+	var md5s []string
+	for i, key := range keys {
+		hash := makeTempFile(t, makeFilename(dir, key), size+i)
+		md5s = append(md5s, hash)
+	}
+	return md5s
+}
+
+func makeTempFile(t *testing.T, filename string, size int) string {
+	data := make([]byte, size)
+	for i := 0; i < size; i++ {
+		data[i] = 'z'
+	}
+
+	if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
+		t.Fatalf("cannot create directory for %s: %v", filename, err)
+	}
+	if err := ioutil.WriteFile(filename, data, 0644); err != nil {
+		t.Fatalf("cannot create file %s: %v", filename, err)
+	}
+
+	md5W := md5.New()
+	md5W.Write(data)
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil))
+}
+
+func makeArtifact(key, hash string) *pb.ArtifactMetadata {
+	return &pb.ArtifactMetadata{
+		Name:        key,
+		Md5:         hash,
+		Permissions: 0644,
+	}
+}
+
+func makeFilename(dir, key string) string {
+	return filepath.Join(dir, filepath.FromSlash(key))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/server_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/server_test.go b/sdks/go/pkg/beam/artifact/server_test.go
new file mode 100644
index 0000000..c24e308
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/server_test.go
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// startServer starts an in-memory staging and retrieval artifact server
+// and returns a gRPC connection to it.
+func startServer(t *testing.T) *grpc.ClientConn {
+	// If port is zero this will bind an unused port.
+	listener, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("Failed to find unused port: %v", err)
+	}
+	endpoint := listener.Addr().String()
+
+	real := &server{m: make(map[string]*manifest)}
+
+	gs := grpc.NewServer()
+	pb.RegisterArtifactStagingServiceServer(gs, real)
+	pb.RegisterArtifactRetrievalServiceServer(gs, real)
+	go gs.Serve(listener)
+
+	t.Logf("server listening on %v", endpoint)
+
+	cc, err := grpc.Dial(endpoint, grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("failed to dial fake server at %v: %v", endpoint, err)
+	}
+	return cc
+}
+
+type data struct {
+	md     *pb.ArtifactMetadata
+	chunks [][]byte
+}
+
+type manifest struct {
+	md *pb.Manifest
+	m  map[string]*data // key -> data
+	mu sync.Mutex
+}
+
+// server is a in-memory staging and retrieval artifact server for testing.
+type server struct {
+	m  map[string]*manifest // token -> manifest
+	mu sync.Mutex
+}
+
+func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) error {
+	id, err := grpcx.ReadWorkerId(ps.Context())
+	if err != nil {
+		return fmt.Errorf("expected worker id: %v", err)
+	}
+
+	// Read header
+
+	header, err := ps.Recv()
+	if err != nil {
+		return fmt.Errorf("failed to receive header: %v", err)
+	}
+	if header.GetMetadata() == nil {
+		return fmt.Errorf("expected header as first message: %v", header)
+	}
+	key := header.GetMetadata().Name
+
+	// Read chunks
+
+	var chunks [][]byte
+	for {
+		msg, err := ps.Recv()
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return err
+		}
+
+		if msg.GetData() == nil {
+			return fmt.Errorf("expected data: %v", msg)
+		}
+		if len(msg.GetData().GetData()) == 0 {
+			return fmt.Errorf("expected non-empty data: %v", msg)
+		}
+		chunks = append(chunks, msg.GetData().GetData())
+	}
+
+	// Updated staged artifact. This test implementation will allow updates to artifacts
+	// that are already committed, but real implementations should manage artifacts in a
+	// way that makes that impossible.
+
+	m := s.getManifest(id, true)
+	m.mu.Lock()
+	m.m[key] = &data{chunks: chunks}
+	m.mu.Unlock()
+
+	return ps.SendAndClose(&pb.PutArtifactResponse{})
+}
+
+func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+	id, err := grpcx.ReadWorkerId(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, true)
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	// Verify that all artifacts are properly staged. Fail if not.
+
+	artifacts := req.GetManifest().GetArtifact()
+	for _, md := range artifacts {
+		if _, ok := m.m[md.Name]; !ok {
+			return nil, fmt.Errorf("artifact %v not staged", md.Name)
+		}
+	}
+
+	// Update commit. Only one manifest can exist for each staging id.
+
+	for _, md := range artifacts {
+		m.m[md.Name].md = md
+	}
+	m.md = req.GetManifest()
+
+	return &pb.CommitManifestResponse{StagingToken: id}, nil
+}
+
+func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
+	id, err := grpcx.ReadWorkerId(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, false)
+	if m == nil || m.md == nil {
+		return nil, fmt.Errorf("manifest for %v not found", id)
+	}
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	return &pb.GetManifestResponse{Manifest: m.md}, nil
+}
+
+func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
+	id, err := grpcx.ReadWorkerId(stream.Context())
+	if err != nil {
+		return fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, false)
+	if m == nil || m.md == nil {
+		return fmt.Errorf("manifest for %v not found", id)
+	}
+
+	// Validate artifact and grab chunks so that we can stream them without
+	// holding the lock.
+
+	m.mu.Lock()
+	elm, ok := m.m[req.GetName()]
+	if !ok || elm.md == nil {
+		m.mu.Unlock()
+		return fmt.Errorf("manifest for %v does not contain artifact %v", id, req.GetName())
+	}
+	chunks := elm.chunks
+	m.mu.Unlock()
+
+	// Send chunks exactly as we received them.
+
+	for _, chunk := range chunks {
+		if err := stream.Send(&pb.ArtifactChunk{Data: chunk}); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *server) getManifest(id string, create bool) *manifest {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	ret, ok := s.m[id]
+	if !ok && create {
+		ret = &manifest{m: make(map[string]*data)}
+		s.m[id] = ret
+	}
+	return ret
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/stage.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/stage.go b/sdks/go/pkg/beam/artifact/stage.go
new file mode 100644
index 0000000..8d97079
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/stage.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"math/rand"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+)
+
+// Commit commits a manifest with the given staged artifacts. It returns the
+// staging token, if successful.
+func Commit(ctx context.Context, client pb.ArtifactStagingServiceClient, artifacts []*pb.ArtifactMetadata) (string, error) {
+	req := &pb.CommitManifestRequest{
+		Manifest: &pb.Manifest{
+			Artifact: artifacts,
+		},
+	}
+	resp, err := client.CommitManifest(ctx, req)
+	if err != nil {
+		return "", err
+	}
+	return resp.GetStagingToken(), nil
+}
+
+// StageDir stages a local directory with relative path keys. Convenience wrapper.
+func StageDir(ctx context.Context, client pb.ArtifactStagingServiceClient, src string) ([]*pb.ArtifactMetadata, error) {
+	list, err := scan(src)
+	if err != nil || len(list) == 0 {
+		return nil, err
+	}
+	return MultiStage(ctx, client, 10, list)
+}
+
+// MultiStage stages a set of local files with the given keys. It returns
+// the full artifact metadate.  It retries each artifact a few times.
+// Convenience wrapper.
+func MultiStage(ctx context.Context, client pb.ArtifactStagingServiceClient, cpus int, list []KeyedFile) ([]*pb.ArtifactMetadata, error) {
+	if cpus < 1 {
+		cpus = 1
+	}
+	if len(list) < cpus {
+		cpus = len(list)
+	}
+
+	q := make(chan KeyedFile, len(list))
+	for _, f := range list {
+		q <- f
+	}
+	close(q)
+	var permErr errorx.GuardedError
+
+	ret := make(chan *pb.ArtifactMetadata, len(list))
+
+	var wg sync.WaitGroup
+	for i := 0; i < cpus; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for f := range q {
+				if permErr.Error() != nil {
+					continue
+				}
+
+				const attempts = 3
+
+				var failures []string
+				for {
+					a, err := Stage(ctx, client, f.Key, f.Filename)
+					if err == nil {
+						ret <- a
+						break
+					}
+					if permErr.Error() != nil {
+						break // give up
+					}
+					failures = append(failures, err.Error())
+					if len(failures) > attempts {
+						permErr.TrySetError(fmt.Errorf("failed to stage %v in %v attempts: %v", f.Filename, attempts, strings.Join(failures, "; ")))
+						break // give up
+					}
+					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+	close(ret)
+
+	return queue2slice(ret), permErr.Error()
+}
+
+// Stage stages a local file as an artifact with the given key. It computes
+// the MD5 and returns the full artifact metadata.
+func Stage(ctx context.Context, client pb.ArtifactStagingServiceClient, key, filename string) (*pb.ArtifactMetadata, error) {
+	stat, err := os.Stat(filename)
+	if err != nil {
+		return nil, err
+	}
+	hash, err := computeMD5(filename)
+	if err != nil {
+		return nil, err
+	}
+	md := &pb.ArtifactMetadata{
+		Name:        key,
+		Permissions: uint32(stat.Mode()),
+		Md5:         hash,
+	}
+
+	fd, err := os.Open(filename)
+	if err != nil {
+		return nil, err
+	}
+	defer fd.Close()
+
+	stream, err := client.PutArtifact(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	header := &pb.PutArtifactRequest{
+		Content: &pb.PutArtifactRequest_Metadata{
+			Metadata: md,
+		},
+	}
+	if err := stream.Send(header); err != nil {
+		stream.CloseAndRecv() // ignore error
+		return nil, fmt.Errorf("failed to send header for %v: %v", filename, err)
+	}
+	stagedHash, err := stageChunks(stream, fd)
+	if err != nil {
+		stream.CloseAndRecv() // ignore error
+		return nil, fmt.Errorf("failed to send chunks for %v: %v", filename, err)
+	}
+	if _, err := stream.CloseAndRecv(); err != nil {
+		return nil, fmt.Errorf("failed to close stream for %v: %v", filename, err)
+	}
+	if hash != stagedHash {
+		return nil, fmt.Errorf("unexpected MD5 for sent chunks for %v: %v, want %v", filename, stagedHash, hash)
+	}
+	return md, nil
+}
+
+func stageChunks(stream pb.ArtifactStagingService_PutArtifactClient, r io.Reader) (string, error) {
+	md5W := md5.New()
+	data := make([]byte, 1<<20)
+	for {
+		n, err := r.Read(data)
+		if n > 0 {
+			if _, err := md5W.Write(data[:n]); err != nil {
+				panic(err) // cannot fail
+			}
+
+			chunk := &pb.PutArtifactRequest{
+				Content: &pb.PutArtifactRequest_Data{
+					Data: &pb.ArtifactChunk{
+						Data: data[:n],
+					},
+				},
+			}
+			if err := stream.Send(chunk); err != nil {
+				return "", fmt.Errorf("chunk send failed: %v", err)
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+// KeyedFile is a key and filename pair.
+type KeyedFile struct {
+	Key, Filename string
+}
+
+func scan(dir string) ([]KeyedFile, error) {
+	var ret []KeyedFile
+	if err := walk(dir, "", &ret); err != nil {
+		return nil, fmt.Errorf("failed to scan %v for artifacts to stage: %v", dir, err)
+	}
+	return ret, nil
+}
+
+func walk(dir, key string, accum *[]KeyedFile) error {
+	list, err := ioutil.ReadDir(dir)
+	if err != nil {
+		return err
+	}
+
+	for _, elm := range list {
+		k := makeKey(key, elm.Name())
+		f := filepath.Join(dir, elm.Name())
+
+		if elm.IsDir() {
+			walk(f, k, accum)
+			continue
+		}
+		*accum = append(*accum, KeyedFile{k, f})
+	}
+	return nil
+}
+
+func makeKey(prefix, name string) string {
+	if prefix == "" {
+		return name
+	}
+	return path.Join(prefix, name)
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/stage_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/stage_test.go b/sdks/go/pkg/beam/artifact/stage_test.go
new file mode 100644
index 0000000..d1b32b6
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/stage_test.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"io/ioutil"
+	"os"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
+)
+
+// TestStage verifies that local files can be staged correctly.
+func TestStage(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+	client := pb.NewArtifactStagingServiceClient(cc)
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	keys := []string{"foo", "bar", "baz/baz/baz"}
+
+	src := makeTempDir(t)
+	defer os.RemoveAll(src)
+	md5s := makeTempFiles(t, src, keys, 300)
+
+	var artifacts []*pb.ArtifactMetadata
+	for _, key := range keys {
+		a, err := Stage(ctx, client, key, makeFilename(src, key))
+		if err != nil {
+			t.Errorf("failed to stage %v: %v", key, err)
+		}
+		artifacts = append(artifacts, a)
+	}
+	if _, err := Commit(ctx, client, artifacts); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	validate(ctx, cc, t, keys, md5s)
+}
+
+// TestStageDir validates that local files can be staged concurrently.
+func TestStageDir(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+	client := pb.NewArtifactStagingServiceClient(cc)
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
+
+	src := makeTempDir(t)
+	defer os.RemoveAll(src)
+	md5s := makeTempFiles(t, src, keys, 300)
+
+	artifacts, err := StageDir(ctx, client, src)
+	if err != nil {
+		t.Errorf("failed to stage dir %v: %v", src, err)
+	}
+	if _, err := Commit(ctx, client, artifacts); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	validate(ctx, cc, t, keys, md5s)
+}
+
+func validate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys, md5s []string) {
+	rcl := pb.NewArtifactRetrievalServiceClient(cc)
+
+	for i, key := range keys {
+		stream, err := rcl.GetArtifact(ctx, &pb.GetArtifactRequest{Name: key})
+		if err != nil {
+			t.Fatalf("failed to get artifact for %v: %v", key, err)
+		}
+
+		hash, err := retrieveChunks(stream, ioutil.Discard)
+		if err != nil {
+			t.Fatalf("failed to get chunks for %v: %v", key, err)
+		}
+		if hash != md5s[i] {
+			t.Errorf("incorrect MD5: %v, want %v", hash, md5s[i])
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/model/gen.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/gen.go b/sdks/go/pkg/beam/model/gen.go
new file mode 100644
index 0000000..9bff5e0
--- /dev/null
+++ b/sdks/go/pkg/beam/model/gen.go
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package model
+
+// TODO(herohde) 9/1/2017: for now, install protoc as described on grpc.io before running go generate.
+
+//go:generate protoc -I../../../../common/runner-api/src/main/proto ../../../../common/runner-api/src/main/proto/beam_artifact_api.proto --go_out=org_apache_beam_runner_v1,plugins=grpc:org_apache_beam_runner_v1
+//go:generate protoc -I../../../../common/fn-api/src/main/proto ../../../../common/fn-api/src/main/proto/beam_provision_api.proto --go_out=org_apache_beam_fn_v1,plugins=grpc:org_apache_beam_fn_v1


[3/3] beam git commit: This closes #3928

Posted by tg...@apache.org.
This closes #3928


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63b54a5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63b54a5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63b54a5b

Branch: refs/heads/master
Commit: 63b54a5b0d64e5eb427fb249d5612d60d34e1741
Parents: ed00299 c18f15c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 6 08:48:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Oct 6 08:48:19 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  49 ++
 runners/gcp/gcemd/Dockerfile                    |  30 +
 runners/gcp/gcemd/main.go                       |  85 +++
 runners/gcp/gcemd/pom.xml                       | 154 ++++
 runners/gcp/gcsproxy/Dockerfile                 |  30 +
 runners/gcp/gcsproxy/main.go                    |  91 +++
 runners/gcp/gcsproxy/pom.xml                    | 154 ++++
 runners/gcp/pom.xml                             |  38 +
 runners/pom.xml                                 |   1 +
 .../src/main/proto/beam_provision_api.proto     |   8 +-
 .../src/main/proto/beam_artifact_api.proto      |  10 +
 sdks/go/BUILD.md                                |  63 ++
 sdks/go/cmd/beamctl/artifact.go                 |  98 +++
 sdks/go/cmd/beamctl/main.go                     |  64 ++
 sdks/go/descriptor.xml                          |  29 +
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 155 ++++
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go   | 200 +++++
 sdks/go/pkg/beam/artifact/materialize.go        | 240 ++++++
 sdks/go/pkg/beam/artifact/materialize_test.go   | 238 ++++++
 sdks/go/pkg/beam/artifact/server_test.go        | 212 ++++++
 sdks/go/pkg/beam/artifact/stage.go              | 238 ++++++
 sdks/go/pkg/beam/artifact/stage_test.go         |  98 +++
 sdks/go/pkg/beam/model/gen.go                   |  21 +
 .../beam_provision_api.pb.go                    | 219 ++++++
 .../beam_artifact_api.pb.go                     | 729 +++++++++++++++++++
 sdks/go/pkg/beam/provision/provision_test.go    |  54 ++
 sdks/go/pkg/beam/provision/provison.go          |  80 ++
 sdks/go/pkg/beam/util/errorx/guarded.go         |  47 ++
 sdks/go/pkg/beam/util/execx/exec.go             |  33 +
 sdks/go/pkg/beam/util/gcsx/gcs.go               |  88 +++
 sdks/go/pkg/beam/util/grpcx/dial.go             |  37 +
 sdks/go/pkg/beam/util/grpcx/metadata.go         |  55 ++
 sdks/go/pom.xml                                 | 163 +++++
 sdks/java/container/Dockerfile                  |  28 +
 sdks/java/container/boot.go                     | 111 +++
 sdks/java/container/pom.xml                     | 184 +++++
 sdks/java/harness/pom.xml                       |  66 ++
 sdks/java/pom.xml                               |   1 +
 sdks/pom.xml                                    |   1 +
 39 files changed, 4201 insertions(+), 1 deletion(-)
----------------------------------------------------------------------