You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/06 15:53:57 UTC

[2/3] beam git commit: [BEAM-2877][BEAM-2881] Add Java SDK harness container image and support

[BEAM-2877][BEAM-2881] Add Java SDK harness container image and support

 * Add support for building Go code and docker container images with
   maven (see sdks/go/BUILD.md for details). The latter is only done
   if the "build-containers" profile is used.
 * Add GCS proxy service for managing artifacts in GCS.
 * Add GCE md service for metdata-configured provision info in GCE.
 * Add beamctl tool for manually interacting with these services.

This PR is focused on the execution side and would need support from
the submission side as well to be functional. The ULR will likely be
the first runner to tie everything together. The contents of the java
image is kept simple for now.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c18f15cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c18f15cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c18f15cd

Branch: refs/heads/master
Commit: c18f15cdf7eeabcaf64a2c808372683c3d823d4d
Parents: ed00299
Author: Henning Rohde <he...@google.com>
Authored: Sat Sep 30 22:36:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Oct 6 08:48:18 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  49 ++
 runners/gcp/gcemd/Dockerfile                    |  30 +
 runners/gcp/gcemd/main.go                       |  85 +++
 runners/gcp/gcemd/pom.xml                       | 154 ++++
 runners/gcp/gcsproxy/Dockerfile                 |  30 +
 runners/gcp/gcsproxy/main.go                    |  91 +++
 runners/gcp/gcsproxy/pom.xml                    | 154 ++++
 runners/gcp/pom.xml                             |  38 +
 runners/pom.xml                                 |   1 +
 .../src/main/proto/beam_provision_api.proto     |   8 +-
 .../src/main/proto/beam_artifact_api.proto      |  10 +
 sdks/go/BUILD.md                                |  63 ++
 sdks/go/cmd/beamctl/artifact.go                 |  98 +++
 sdks/go/cmd/beamctl/main.go                     |  64 ++
 sdks/go/descriptor.xml                          |  29 +
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 155 ++++
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go   | 200 +++++
 sdks/go/pkg/beam/artifact/materialize.go        | 240 ++++++
 sdks/go/pkg/beam/artifact/materialize_test.go   | 238 ++++++
 sdks/go/pkg/beam/artifact/server_test.go        | 212 ++++++
 sdks/go/pkg/beam/artifact/stage.go              | 238 ++++++
 sdks/go/pkg/beam/artifact/stage_test.go         |  98 +++
 sdks/go/pkg/beam/model/gen.go                   |  21 +
 .../beam_provision_api.pb.go                    | 219 ++++++
 .../beam_artifact_api.pb.go                     | 729 +++++++++++++++++++
 sdks/go/pkg/beam/provision/provision_test.go    |  54 ++
 sdks/go/pkg/beam/provision/provison.go          |  80 ++
 sdks/go/pkg/beam/util/errorx/guarded.go         |  47 ++
 sdks/go/pkg/beam/util/execx/exec.go             |  33 +
 sdks/go/pkg/beam/util/gcsx/gcs.go               |  88 +++
 sdks/go/pkg/beam/util/grpcx/dial.go             |  37 +
 sdks/go/pkg/beam/util/grpcx/metadata.go         |  55 ++
 sdks/go/pom.xml                                 | 163 +++++
 sdks/java/container/Dockerfile                  |  28 +
 sdks/java/container/boot.go                     | 111 +++
 sdks/java/container/pom.xml                     | 184 +++++
 sdks/java/harness/pom.xml                       |  66 ++
 sdks/java/pom.xml                               |   1 +
 sdks/pom.xml                                    |   1 +
 39 files changed, 4201 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d9c2e6d..42671e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,9 @@
     <kubectl>kubectl</kubectl>
     <!-- the standard location for kubernete's config file -->
     <kubeconfig>${user.home}/.kube/config</kubeconfig>
+
+    <!-- For container builds, override to push containers to a registry -->
+    <docker-repository-root>${user.name}</docker-repository-root>
   </properties>
 
   <packaging>pom</packaging>
@@ -364,6 +367,35 @@
         </pluginManagement>
       </build>
     </profile>
+
+    <profile>
+      <id>build-containers</id>
+      <build>
+        <!-- TODO(BEAM-2878): enable container build for releases -->
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>com.spotify</groupId>
+              <artifactId>dockerfile-maven-plugin</artifactId>
+              <executions>
+                <execution>
+                  <id>default</id>
+                  <goals>
+                    <goal>build</goal>
+                    <goal>push</goal>
+                  </goals>
+                  <configuration>
+                    <tag>latest</tag>
+                    <noCache>true</noCache>
+                  </configuration>
+                </execution>
+              </executions>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
+
   </profiles>
 
   <dependencyManagement>
@@ -1855,6 +1887,23 @@
             </execution>
           </executions>
         </plugin>
+
+        <plugin>
+          <groupId>com.igormaznitsa</groupId>
+          <artifactId>mvn-golang-wrapper</artifactId>
+          <version>2.1.6</version>
+          <extensions>true</extensions>
+          <configuration>
+            <goVersion>1.9</goVersion>
+          </configuration>
+        </plugin>
+
+        <plugin>
+          <groupId>com.spotify</groupId>
+          <artifactId>dockerfile-maven-plugin</artifactId>
+          <version>1.3.5</version>
+          <!-- no executions by default. Use build-containers profile -->
+        </plugin>
       </plugins>
     </pluginManagement>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/Dockerfile
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/Dockerfile b/runners/gcp/gcemd/Dockerfile
new file mode 100644
index 0000000..b8fa8aa
--- /dev/null
+++ b/runners/gcp/gcemd/Dockerfile
@@ -0,0 +1,30 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM debian:stretch
+MAINTAINER "Apache Beam <de...@beam.apache.org>"
+
+RUN apt-get update && \
+    DEBIAN_FRONTEND=noninteractive apt-get install -y \
+        ca-certificates \
+        && \
+    rm -rf /var/lib/apt/lists/*
+
+ADD target/linux_amd64/gcemd /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/gcemd"]

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/main.go
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/main.go b/runners/gcp/gcemd/main.go
new file mode 100644
index 0000000..66b049e
--- /dev/null
+++ b/runners/gcp/gcemd/main.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// gcemd is a metadata-configured provisioning server for GCE.
+package main
+
+import (
+	"flag"
+	"log"
+	"net"
+
+	"cloud.google.com/go/compute/metadata"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_fn_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/provision"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+var (
+	endpoint = flag.String("endpoint", "", "Server endpoint to expose.")
+)
+
+func main() {
+	flag.Parse()
+	if *endpoint == "" {
+		log.Fatal("No endpoint provided. Use --endpoint=localhost:12345")
+	}
+	if !metadata.OnGCE() {
+		log.Fatal("Not running on GCE")
+	}
+
+	log.Printf("Starting provisioning server on %v", *endpoint)
+
+	jobID, err := metadata.InstanceAttributeValue("job_id")
+	if err != nil {
+		log.Fatalf("Failed to find job ID: %v", err)
+	}
+	jobName, err := metadata.InstanceAttributeValue("job_name")
+	if err != nil {
+		log.Fatalf("Failed to find job name: %v", err)
+	}
+	opt, err := metadata.InstanceAttributeValue("sdk_pipeline_options")
+	if err != nil {
+		log.Fatalf("Failed to find SDK pipeline options: %v", err)
+	}
+	options, err := provision.JSONToProto(opt)
+	if err != nil {
+		log.Fatalf("Failed to parse SDK pipeline options: %v", err)
+	}
+
+	info := &pb.ProvisionInfo{
+		JobId:           jobID,
+		JobName:         jobName,
+		PipelineOptions: options,
+	}
+
+	gs := grpc.NewServer()
+	pb.RegisterProvisionServiceServer(gs, &server{info: info})
+
+	listener, err := net.Listen("tcp", *endpoint)
+	if err != nil {
+		log.Fatalf("Failed to listen to %v: %v", *endpoint, err)
+	}
+	log.Fatalf("Server failed: %v", gs.Serve(listener))
+}
+
+type server struct {
+	info *pb.ProvisionInfo
+}
+
+func (s *server) GetProvisionInfo(ctx context.Context, req *pb.GetProvisionInfoRequest) (*pb.GetProvisionInfoResponse, error) {
+	return &pb.GetProvisionInfoResponse{Info: s.info}, nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcemd/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/gcemd/pom.xml b/runners/gcp/gcemd/pom.xml
new file mode 100644
index 0000000..377e3e0
--- /dev/null
+++ b/runners/gcp/gcemd/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-gcp-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-gcemd</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform :: GCE metadata provisioning</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/gcemd</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>cloud.google.com/go/compute/metadata</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcemd</package>
+              </packages>
+              <resultName>gcemd</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcemd</package>
+              </packages>
+              <resultName>linux_amd64/gcemd</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/gcemd</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/Dockerfile
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/Dockerfile b/runners/gcp/gcsproxy/Dockerfile
new file mode 100644
index 0000000..5ff9141
--- /dev/null
+++ b/runners/gcp/gcsproxy/Dockerfile
@@ -0,0 +1,30 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM debian:stretch
+MAINTAINER "Apache Beam <de...@beam.apache.org>"
+
+RUN apt-get update && \
+    DEBIAN_FRONTEND=noninteractive apt-get install -y \
+        ca-certificates \
+        && \
+    rm -rf /var/lib/apt/lists/*
+
+ADD target/linux_amd64/gcsproxy /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/gcsproxy"]

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/main.go
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/main.go b/runners/gcp/gcsproxy/main.go
new file mode 100644
index 0000000..ec63032
--- /dev/null
+++ b/runners/gcp/gcsproxy/main.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// gcsproxy is an artifact server backed by GCS and can run in either retrieval
+// (read) or staging (write) mode.
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"net"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact/gcsproxy"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"google.golang.org/grpc"
+)
+
+const (
+	retrieve = "retrieve"
+	stage    = "stage"
+)
+
+var (
+	mode     = flag.String("mode", retrieve, "Proxy mode: retrieve or stage.")
+	endpoint = flag.String("endpoint", "", "Server endpoint to expose.")
+	manifest = flag.String("manifest", "", "Location of proxy manifest.")
+)
+
+func main() {
+	flag.Parse()
+	if *manifest == "" {
+		log.Fatal("No proxy manifest location provided. Use --manifest=gs://foo/bar")
+	}
+	if *endpoint == "" {
+		log.Fatal("No endpoint provided. Use --endpoint=localhost:12345")
+	}
+
+	gs := grpc.NewServer()
+
+	switch *mode {
+	case retrieve:
+		// Retrieval mode. We download the manifest -- but not the
+		// artifacts -- eagerly.
+
+		log.Printf("Starting retrieval proxy from %v on %v", *manifest, *endpoint)
+
+		md, err := gcsproxy.ReadProxyManifest(context.Background(), *manifest)
+		if err != nil {
+			log.Fatalf("Failed to obtain proxy manifest %v: %v", *manifest, err)
+		}
+		proxy, err := gcsproxy.NewRetrievalServer(md)
+		if err != nil {
+			log.Fatalf("Failed to create artifact server: %v", err)
+		}
+		pb.RegisterArtifactRetrievalServiceServer(gs, proxy)
+
+	case stage:
+		// Staging proxy. We update the blobs next to the manifest
+		// in a blobs "directory".
+
+		log.Printf("Starting staging proxy to %v on %v", *manifest, *endpoint)
+
+		proxy, err := gcsproxy.NewStagingServer(*manifest)
+		if err != nil {
+			log.Fatalf("Failed to create artifact server: %v", err)
+		}
+		pb.RegisterArtifactStagingServiceServer(gs, proxy)
+
+	default:
+		log.Fatalf("Invalid mode: '%v', want '%v' or '%v'", *mode, retrieve, stage)
+	}
+
+	listener, err := net.Listen("tcp", *endpoint)
+	if err != nil {
+		log.Fatalf("Failed to listen to %v: %v", *endpoint, err)
+	}
+	log.Fatalf("Server failed: %v", gs.Serve(listener))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/gcsproxy/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/gcsproxy/pom.xml b/runners/gcp/gcsproxy/pom.xml
new file mode 100644
index 0000000..35be16e
--- /dev/null
+++ b/runners/gcp/gcsproxy/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-gcp-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-gcsproxy</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform :: GCS artifact proxy</name>
+
+  <properties>
+    <!-- Add full path directory structure for 'go get' compatibility -->
+    <go.source.base>${project.basedir}/target/src</go.source.base>
+    <go.source.dir>${go.source.base}/github.com/apache/beam/sdks/go</go.source.dir>
+  </properties>
+
+  <build>
+    <sourceDirectory>${go.source.base}</sourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-go-cmd-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd/gcsproxy</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>.</directory>
+                  <includes>
+                    <include>*.go</include>
+                  </includes>
+                  <filtering>false</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- CAVEAT: for latest shared files, run mvn install in sdks/go -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependency</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>unpack</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.beam</groupId>
+                  <artifactId>beam-sdks-go</artifactId>
+                  <version>${project.version}</version>
+                  <type>zip</type>
+                  <classifier>pkg-sources</classifier>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${go.source.dir}</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.igormaznitsa</groupId>
+        <artifactId>mvn-golang-wrapper</artifactId>
+        <executions>
+          <execution>
+            <id>go-get-imports</id>
+            <goals>
+              <goal>get</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>google.golang.org/grpc</package>
+                <package>golang.org/x/oauth2/google</package>
+                <package>google.golang.org/api/storage/v1</package>
+              </packages>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcsproxy</package>
+              </packages>
+              <resultName>gcsproxy</resultName>
+            </configuration>
+          </execution>
+          <execution>
+            <id>go-build-linux-amd64</id>
+            <goals>
+              <goal>build</goal>
+            </goals>
+            <phase>compile</phase>
+            <configuration>
+              <packages>
+                <package>github.com/apache/beam/cmd/gcsproxy</package>
+              </packages>
+              <resultName>linux_amd64/gcsproxy</resultName>
+              <targetArch>amd64</targetArch>
+              <targetOs>linux</targetOs>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.spotify</groupId>
+        <artifactId>dockerfile-maven-plugin</artifactId>
+        <configuration>
+          <repository>${docker-repository-root}/gcsproxy</repository>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/gcp/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gcp/pom.xml b/runners/gcp/pom.xml
new file mode 100644
index 0000000..d900212
--- /dev/null
+++ b/runners/gcp/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gcp-parent</artifactId>
+
+  <packaging>pom</packaging>
+
+  <name>Apache Beam :: Runners :: Google Cloud Platform</name>
+
+  <modules>
+    <module>gcemd</module>
+    <module>gcsproxy</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index e0a47bd..a9c33d7 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -42,6 +42,7 @@
     <module>google-cloud-dataflow-java</module>
     <module>spark</module>
     <module>apex</module>
+    <module>gcp</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
index fb4f252..b0cd6b4 100644
--- a/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_provision_api.proto
@@ -31,7 +31,7 @@ option java_outer_classname = "ProvisionApi";
 import "google/protobuf/struct.proto";
 
 // A service to provide runtime provisioning information to the SDK harness
-// worker instances -- such as pipeline options, resource constaints and
+// worker instances -- such as pipeline options, resource constraints and
 // other job metadata -- needed by an SDK harness instance to initialize.
 service ProvisionService {
     // Get provision information for the SDK harness worker instance.
@@ -43,6 +43,12 @@ message GetProvisionInfoRequest { }
 
 // A response containing the provision info of a SDK harness worker instance.
 message GetProvisionInfoResponse {
+    ProvisionInfo info = 1;
+}
+
+// Runtime provisioning information for a SDK harness worker instance,
+// such as pipeline options, resource constraints and other job metadata
+message ProvisionInfo {
     // (required) The job ID.
     string job_id = 1;
     // (required) The job name.

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
index 12b0217..e11551c 100644
--- a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
@@ -72,6 +72,16 @@ message Manifest {
   repeated ArtifactMetadata artifact = 1;
 }
 
+// A manifest with location information.
+message ProxyManifest {
+  Manifest manifest = 1;
+  message Location {
+     string name = 1;
+     string uri = 2;
+  }
+  repeated Location location = 2;
+}
+
 // A request to get the manifest of a Job.
 message GetManifestRequest {}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/BUILD.md
----------------------------------------------------------------------
diff --git a/sdks/go/BUILD.md b/sdks/go/BUILD.md
new file mode 100644
index 0000000..1bbfdf0
--- /dev/null
+++ b/sdks/go/BUILD.md
@@ -0,0 +1,63 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Go build
+
+This document describes the [Go](golang.org) code layout and build integration
+with Maven. The setup is non-trivial, because the Go toolchain expects a
+certain layout and Maven support is limited.
+
+Goals:
+
+ 1. Go code can be built and tested using Maven w/o special requirements.
+ 1. Go tools such as `go build`, `go test` and `go generate` work as usual.
+ 1. Go code can be pulled with `go get` from `github.com/apache/beam` for users.
+ 1. Go programs can used in docker container images.
+
+In short, the goals are to make both worlds work well.
+
+### Maven integration
+
+The Go toolchain expects the package name to match the directory structure,
+which in turn must be rooted in `github.com/apache/beam` for `go get` to work.
+This directory prefix is beyond the repo itself and we must copy the Go source
+code into such a layout to invoke the tool chain. We use a single directory
+`sdks/go` for all shared library code and export it as a zip file during the 
+build process to be used by various tools, such as `sdks/java/container`.
+This scheme balances the convenience of combined Go setup with the desire
+for a unified layout across languages. Python seems to do the same.
+
+The container build adds a small twist to the build integration, because
+container images use linux/amd64 but the development setup might not. We
+therefore additionally cross-compile Go binaries for inclusion into container
+images where needed, generally placed in `target/linux_amd64`.
+
+### Go development setup
+
+Developers must clone their git repository into:
+```
+$GOPATH/src/github.com/apache
+
+```
+to match the package structure expected by the code imports. Go users can just
+`go get` the code directly. For example:
+```
+go get github.com/apache/beam/sdks/go/...
+```
+Developers must invoke Go for cross-compilation manually, if desired.

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/cmd/beamctl/artifact.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/artifact.go b/sdks/go/cmd/beamctl/artifact.go
new file mode 100644
index 0000000..d8c2c37
--- /dev/null
+++ b/sdks/go/cmd/beamctl/artifact.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"path/filepath"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/spf13/cobra"
+)
+
+var (
+	artifactCmd = &cobra.Command{
+		Use:   "artifact",
+		Short: "Artifact commands",
+	}
+
+	stageCmd = &cobra.Command{
+		Use:   "stage",
+		Short: "Stage local files as artifacts",
+		RunE:  stageFn,
+		Args:  cobra.MinimumNArgs(1),
+	}
+
+	listCmd = &cobra.Command{
+		Use:   "list",
+		Short: "List artifacts",
+		RunE:  listFn,
+		Args:  cobra.NoArgs,
+	}
+)
+
+func init() {
+	artifactCmd.AddCommand(stageCmd, listCmd)
+}
+
+func stageFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	// (1) Use flat filename as key.
+
+	var files []artifact.KeyedFile
+	for _, arg := range args {
+		files = append(files, artifact.KeyedFile{Key: filepath.Base(arg), Filename: arg})
+	}
+
+	// (2) Stage files in parallel, commit and print out token
+
+	client := pb.NewArtifactStagingServiceClient(cc)
+	list, err := artifact.MultiStage(ctx, client, 10, files)
+	if err != nil {
+		return err
+	}
+	token, err := artifact.Commit(ctx, client, list)
+	if err != nil {
+		return err
+	}
+
+	cmd.Println(token)
+	return nil
+}
+
+func listFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	md, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
+	if err != nil {
+		return err
+	}
+
+	for _, a := range md.GetManifest().GetArtifact() {
+		cmd.Println(a.Name)
+	}
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/cmd/beamctl/main.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/main.go b/sdks/go/cmd/beamctl/main.go
new file mode 100644
index 0000000..9ce47a7
--- /dev/null
+++ b/sdks/go/cmd/beamctl/main.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// beamctl is a command line client for the Apache Beam portability services.
+package main
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/spf13/cobra"
+	"google.golang.org/grpc"
+)
+
+var (
+	rootCmd = &cobra.Command{
+		Use:   "beamctl",
+		Short: "Apache Beam command line client",
+	}
+
+	id       string
+	endpoint string
+)
+
+func init() {
+	rootCmd.AddCommand(artifactCmd)
+	rootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "Server endpoint, such as localhost:123")
+	rootCmd.PersistentFlags().StringVarP(&id, "id", "i", "", "Client ID")
+}
+
+func main() {
+	if err := rootCmd.Execute(); err != nil {
+		fmt.Println(err)
+		os.Exit(1)
+	}
+}
+
+// dial connects via gRPC to the given endpoint and returns the connection
+// and the context to use.
+func dial() (context.Context, *grpc.ClientConn, error) {
+	if endpoint == "" {
+		return nil, nil, errors.New("endpoint not defined")
+	}
+
+	ctx := grpcx.WriteWorkerId(context.Background(), id)
+	cc, err := grpcx.Dial(ctx, endpoint, time.Minute)
+	return ctx, cc, err
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/descriptor.xml
----------------------------------------------------------------------
diff --git a/sdks/go/descriptor.xml b/sdks/go/descriptor.xml
new file mode 100644
index 0000000..15ec4e8
--- /dev/null
+++ b/sdks/go/descriptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<assembly>
+    <id>pkg-sources</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>pkg</directory>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
new file mode 100644
index 0000000..7a11568
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package gcsproxy
+
+import (
+	"fmt"
+	"io"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/net/context"
+	"google.golang.org/api/storage/v1"
+)
+
+// RetrievalServer is a artifact retrieval server backed by Google
+// Cloud Storage (GCS). It serves a single manifest and ignores
+// the worker id. The server performs no caching or pre-fetching.
+type RetrievalServer struct {
+	md    *pb.Manifest
+	blobs map[string]string
+}
+
+// ReadProxyManifest reads and parses the proxy manifest from GCS.
+func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
+	bucket, obj, err := gcsx.ParseObject(object)
+	if err != nil {
+		return nil, fmt.Errorf("invalid manifest object %v: %v", object, err)
+	}
+
+	cl, err := gcsx.NewClient(ctx, storage.DevstorageReadOnlyScope)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+	}
+	content, err := gcsx.ReadObject(cl, bucket, obj)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read manifest %v: %v", object, err)
+	}
+	var md pb.ProxyManifest
+	if err := proto.Unmarshal(content, &md); err != nil {
+		return nil, fmt.Errorf("invalid manifest %v: %v", object, err)
+	}
+	return &md, nil
+}
+
+// NewRetrievalServer creates a artifact retrieval server for the
+// given manifest. It requires that the locations are in GCS.
+func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
+	if err := validate(md); err != nil {
+		return nil, err
+	}
+
+	blobs := make(map[string]string)
+	for _, l := range md.GetLocation() {
+		if _, _, err := gcsx.ParseObject(l.GetUri()); err != nil {
+			return nil, fmt.Errorf("location %v is not a GCS object: %v", l.GetUri(), err)
+		}
+		blobs[l.GetName()] = l.GetUri()
+	}
+	return &RetrievalServer{md: md.GetManifest(), blobs: blobs}, nil
+}
+
+// GetManifest returns the manifest for all artifacts.
+func (s *RetrievalServer) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
+	return &pb.GetManifestResponse{Manifest: s.md}, nil
+}
+
+// GetArtifact returns a given artifact.
+func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
+	key := req.GetName()
+	blob, ok := s.blobs[key]
+	if !ok {
+		return fmt.Errorf("artifact %v not found", key)
+	}
+
+	bucket, object := parseObject(blob)
+
+	client, err := gcsx.NewClient(stream.Context(), storage.DevstorageReadOnlyScope)
+	if err != nil {
+		return fmt.Errorf("Failed to create client for %v: %v", key, err)
+	}
+
+	// Stream artifact in up to 1MB chunks.
+
+	resp, err := client.Objects.Get(bucket, object).Download()
+	if err != nil {
+		return fmt.Errorf("Failed to read object for %v: %v", key, err)
+	}
+	defer resp.Body.Close()
+
+	data := make([]byte, 1<<20)
+	for {
+		n, err := resp.Body.Read(data)
+		if n > 0 {
+			if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
+				return fmt.Errorf("chunk send failed: %v", err)
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return fmt.Errorf("failed to read from %v: %v", blob, err)
+		}
+	}
+	return nil
+}
+
+func validate(md *pb.ProxyManifest) error {
+	keys := make(map[string]bool)
+	for _, a := range md.GetManifest().GetArtifact() {
+		if _, seen := keys[a.Name]; seen {
+			return fmt.Errorf("multiple artifact with name %v", a.Name)
+		}
+		keys[a.Name] = true
+	}
+	for _, l := range md.GetLocation() {
+		fresh, seen := keys[l.Name]
+		if !seen {
+			return fmt.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
+		}
+		if !fresh {
+			return fmt.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
+		}
+		keys[l.Name] = false
+	}
+
+	for key, fresh := range keys {
+		if fresh {
+			return fmt.Errorf("no location for %v", key)
+		}
+	}
+	return nil
+}
+
+func parseObject(blob string) (string, string) {
+	bucket, object, err := gcsx.ParseObject(blob)
+	if err != nil {
+		panic(err)
+	}
+	return bucket, object
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
new file mode 100644
index 0000000..3c67b1a
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package gcsproxy contains artifact staging and retrieval servers backed by GCS.
+package gcsproxy
+
+import (
+	"bytes"
+	"crypto/md5"
+	"encoding/base64"
+	"errors"
+	"fmt"
+	"hash"
+	"path"
+	"sync"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/net/context"
+	"google.golang.org/api/storage/v1"
+)
+
+// StaginServer is a artifact staging server backed by Google Cloud Storage
+// (GCS). It commits a single manifest and ignores the staging id.
+type StagingServer struct {
+	manifest     string
+	bucket, root string
+	blobs        map[string]staged // guarded by mu
+	mu           sync.Mutex
+}
+
+type staged struct {
+	object, hash string
+}
+
+// NewStagingServer creates a artifact staging server for the given manifest.
+// It requires that the manifest is in GCS and will stage the supplied
+// artifacts next to it.
+func NewStagingServer(manifest string) (*StagingServer, error) {
+	bucket, object, err := gcsx.ParseObject(manifest)
+	if err != nil {
+		return nil, fmt.Errorf("invalid manifest location: %v", err)
+	}
+	root := path.Join(path.Dir(object), "blobs")
+
+	return &StagingServer{
+		manifest: object,
+		bucket:   bucket,
+		root:     root,
+		blobs:    make(map[string]staged),
+	}, nil
+}
+
+// CommitManifest commits the given artifact manifest to GCS.
+func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+	manifest := req.GetManifest()
+
+	s.mu.Lock()
+	loc, err := matchLocations(manifest.GetArtifact(), s.blobs)
+	if err != nil {
+		s.mu.Unlock()
+		return nil, err
+	}
+	s.mu.Unlock()
+
+	data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal proxy manifest: %v", err)
+	}
+
+	cl, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+	}
+	if err := gcsx.WriteObject(cl, s.bucket, s.manifest, bytes.NewReader(data)); err != nil {
+		return nil, fmt.Errorf("failed to write manifest: %v", err)
+	}
+
+	// Commit returns the location of the manifest as the token, which can
+	// then be used to configure the retrieval proxy. It is redundant right
+	// now, but would be needed for a staging server that serves multiple
+	// jobs. Such a server would also use the ID sent with each request.
+
+	return &pb.CommitManifestResponse{StagingToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
+}
+
+// matchLocations ensures that all artifacts have been staged and have valid
+// content. It is fine for staged artifacts to not appear in the manifest.
+func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) ([]*pb.ProxyManifest_Location, error) {
+	var loc []*pb.ProxyManifest_Location
+	for _, a := range artifacts {
+		info, ok := blobs[a.Name]
+		if !ok {
+			return nil, fmt.Errorf("artifact %v not staged", a.Name)
+		}
+		if a.Md5 == "" {
+			a.Md5 = info.hash
+		}
+		if info.hash != a.Md5 {
+			return nil, fmt.Errorf("staged artifact for %v has invalid MD5: %v, want %v", a.Name, info.hash, a.Md5)
+		}
+
+		loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
+	}
+	return loc, nil
+}
+
+// PutArtifact stores the given artifact in GCS.
+func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) error {
+	// Read header
+
+	header, err := ps.Recv()
+	if err != nil {
+		return fmt.Errorf("failed to receive header: %v", err)
+	}
+	md := header.GetMetadata()
+	if md == nil {
+		return fmt.Errorf("expected header as first message: %v", header)
+	}
+	object := path.Join(s.root, md.Name)
+
+	// Stream content to GCS. We don't have to worry about partial
+	// or abandoned writes, because object writes are atomic.
+
+	cl, err := gcsx.NewClient(ps.Context(), storage.DevstorageReadWriteScope)
+	if err != nil {
+		return fmt.Errorf("failed to create GCS client: %v", err)
+	}
+
+	r := &reader{md5W: md5.New(), stream: ps}
+	if err := gcsx.WriteObject(cl, s.bucket, object, r); err != nil {
+		return fmt.Errorf("failed to stage artifact %v: %v", md.Name, err)
+	}
+	hash := r.MD5()
+	if md.Md5 != "" && md.Md5 != hash {
+		return fmt.Errorf("invalid MD5 for artifact %v: %v want %v", md.Name, hash, md.Md5)
+	}
+
+	s.mu.Lock()
+	s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), hash: hash}
+	s.mu.Unlock()
+
+	return ps.SendAndClose(&pb.PutArtifactResponse{})
+}
+
+// reader is an adapter between the artifact stream and the GCS stream reader.
+// It also computes the MD5 of the content.
+type reader struct {
+	md5W   hash.Hash
+	buf    []byte
+	stream pb.ArtifactStagingService_PutArtifactServer
+}
+
+func (r *reader) Read(buf []byte) (int, error) {
+	if len(r.buf) == 0 {
+		// Buffer empty. Read from upload stream.
+
+		msg, err := r.stream.Recv()
+		if err != nil {
+			return 0, err // EOF or real error
+		}
+
+		r.buf = msg.GetData().GetData()
+		if len(r.buf) == 0 {
+			return 0, errors.New("empty chunk")
+		}
+	}
+
+	// Copy out bytes from non-empty buffer.
+
+	n := len(r.buf)
+	if n > len(buf) {
+		n = len(buf)
+	}
+	for i := 0; i < n; i++ {
+		buf[i] = r.buf[i]
+	}
+	if _, err := r.md5W.Write(r.buf[:n]); err != nil {
+		panic(err) // cannot fail
+	}
+	r.buf = r.buf[n:]
+	return n, nil
+}
+
+func (r *reader) MD5() string {
+	return base64.StdEncoding.EncodeToString(r.md5W.Sum(nil))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/materialize.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go
new file mode 100644
index 0000000..93bed65
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package artifact contains utilities for staging and retrieving artifacts.
+package artifact
+
+import (
+	"bufio"
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"fmt"
+	"io"
+	"math/rand"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+// Materialize is a convenience helper for ensuring that all artifacts are
+// present and uncorrupted. It interprets each artifact name as a relative
+// path under the dest directory. It does not retrieve valid artifacts already
+// present.
+func Materialize(ctx context.Context, endpoint string, dest string) ([]*pb.ArtifactMetadata, error) {
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return nil, err
+	}
+	defer cc.Close()
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+
+	m, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
+	if err != nil {
+		return nil, fmt.Errorf("failed to get manifest: %v", err)
+	}
+	md := m.GetManifest().GetArtifact()
+	return md, MultiRetrieve(ctx, client, 10, md, dest)
+}
+
+// MultiRetrieve retrieves multiple artifacts concurrently, using at most 'cpus'
+// goroutines. It retries each artifact a few times. Convenience wrapper.
+func MultiRetrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, cpus int, list []*pb.ArtifactMetadata, dest string) error {
+	if len(list) == 0 {
+		return nil
+	}
+	if cpus < 1 {
+		cpus = 1
+	}
+	if len(list) < cpus {
+		cpus = len(list)
+	}
+
+	q := slice2queue(list)
+	var permErr errorx.GuardedError
+
+	var wg sync.WaitGroup
+	for i := 0; i < cpus; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for a := range q {
+				if permErr.Error() != nil {
+					continue
+				}
+
+				const attempts = 3
+
+				var failures []string
+				for {
+					err := Retrieve(ctx, client, a, dest)
+					if err == nil || permErr.Error() != nil {
+						break // done or give up
+					}
+					failures = append(failures, err.Error())
+					if len(failures) > attempts {
+						permErr.TrySetError(fmt.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
+						break // give up
+					}
+					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+
+	return permErr.Error()
+}
+
+// Retrieve checks whether the given artifact is already successfully
+// retrieved. If not, it retrieves into the dest directory. It overwrites any
+// previous retrieval attempt and may leave a corrupt/partial local file on
+// failure.
+func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, dest string) error {
+	filename := filepath.Join(dest, filepath.FromSlash(a.Name))
+
+	_, err := os.Stat(filename)
+	if err != nil && !os.IsNotExist(err) {
+		return fmt.Errorf("failed to stat %v: %v", filename, err)
+	}
+	if err == nil {
+		// File already exists. Validate or delete.
+
+		hash, err := computeMD5(filename)
+		if err == nil && a.Md5 == hash {
+			// NOTE(herohde) 10/5/2017: We ignore permissions here, because
+			// they may differ from the requested permissions due to umask
+			// settings on unix systems (which we in turn want to respect).
+			// We have no good way to know what to expect and thus assume
+			// any permissions are fine.
+			return nil
+		}
+
+		if err2 := os.Remove(filename); err2 != nil {
+			return fmt.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
+		} // else: successfully deleted bad file.
+	} // else: file does not exist.
+
+	if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
+		return err
+	}
+	return retrieve(ctx, client, a, filename)
+}
+
+// retrieve retrieves the given artifact and stores it as the given filename.
+// It validates that the given MD5 matches the content and fails otherwise.
+// It expects the file to not exist, but does not clean up on failure and
+// may leave a corrupt file.
+func retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, filename string) error {
+	stream, err := client.GetArtifact(ctx, &pb.GetArtifactRequest{Name: a.Name})
+	if err != nil {
+		return err
+	}
+
+	fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(a.Permissions))
+	if err != nil {
+		return err
+	}
+	w := bufio.NewWriter(fd)
+
+	hash, err := retrieveChunks(stream, w)
+	if err != nil {
+		fd.Close() // drop any buffered content
+		return fmt.Errorf("failed to retrieve chunk for %v: %v", filename, err)
+	}
+	if err := w.Flush(); err != nil {
+		fd.Close()
+		return fmt.Errorf("failed to flush chunks for %v: %v", filename, err)
+	}
+	if err := fd.Close(); err != nil {
+		return err
+	}
+
+	if hash != a.Md5 {
+		return fmt.Errorf("bad MD5 for %v: %v, want %v", filename, hash, a.Md5)
+	}
+	return nil
+}
+
+func retrieveChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
+	md5W := md5.New()
+	for {
+		chunk, err := stream.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+
+		if _, err := md5W.Write(chunk.Data); err != nil {
+			panic(err) // cannot fail
+		}
+		if _, err := w.Write(chunk.Data); err != nil {
+			return "", fmt.Errorf("chunk write failed: %v", err)
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+func computeMD5(filename string) (string, error) {
+	fd, err := os.Open(filename)
+	if err != nil {
+		return "", err
+	}
+	defer fd.Close()
+
+	md5W := md5.New()
+	data := make([]byte, 1<<20)
+	for {
+		n, err := fd.Read(data)
+		if n > 0 {
+			if _, err := md5W.Write(data[:n]); err != nil {
+				panic(err) // cannot fail
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+func slice2queue(list []*pb.ArtifactMetadata) chan *pb.ArtifactMetadata {
+	q := make(chan *pb.ArtifactMetadata, len(list))
+	for _, elm := range list {
+		q <- elm
+	}
+	close(q)
+	return q
+}
+
+func queue2slice(q chan *pb.ArtifactMetadata) []*pb.ArtifactMetadata {
+	var ret []*pb.ArtifactMetadata
+	for elm := range q {
+		ret = append(ret, elm)
+	}
+	return ret
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/materialize_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go
new file mode 100644
index 0000000..5d35512
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
+)
+
+// TestRetrieve tests that we can successfully retrieve fresh files.
+func TestRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	keys := []string{"foo", "bar", "baz/baz/baz"}
+	artifacts := populate(ctx, cc, t, keys, 300)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	for _, a := range artifacts {
+		filename := makeFilename(dst, a.Name)
+		if err := Retrieve(ctx, client, a, dst); err != nil {
+			t.Errorf("failed to retrieve %v: %v", a.Name, err)
+			continue
+		}
+		verifyMD5(t, filename, a.Md5)
+	}
+}
+
+// TestMultiRetrieve tests that we can successfully retrieve fresh files
+// concurrently.
+func TestMultiRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
+	artifacts := populate(ctx, cc, t, keys, 300)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	if err := MultiRetrieve(ctx, client, 10, artifacts, dst); err != nil {
+		t.Errorf("failed to retrieve: %v", err)
+	}
+
+	for _, a := range artifacts {
+		verifyMD5(t, makeFilename(dst, a.Name), a.Md5)
+	}
+}
+
+// TestDirtyRetrieve tests that we can successfully retrieve files in a
+// dirty setup with correct and incorrect pre-existing files.
+func TestDirtyRetrieve(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idC")
+	scl := pb.NewArtifactStagingServiceClient(cc)
+
+	list := []*pb.ArtifactMetadata{
+		stage(ctx, scl, t, "good", 500, 100),
+		stage(ctx, scl, t, "bad", 500, 100),
+	}
+	if _, err := Commit(ctx, scl, list); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	// Kill good file in server by re-staging conflicting content. That ensures
+	// we don't retrieve it.
+	stage(ctx, scl, t, "good", 100, 100)
+
+	dst := makeTempDir(t)
+	defer os.RemoveAll(dst)
+
+	good := filepath.Join(dst, "good")
+	bad := filepath.Join(dst, "bad")
+
+	makeTempFile(t, good, 500) // correct content. Do nothing.
+	makeTempFile(t, bad, 367)  // invalid content. Delete and retrieve.
+
+	rcl := pb.NewArtifactRetrievalServiceClient(cc)
+	if err := MultiRetrieve(ctx, rcl, 2, list, dst); err != nil {
+		t.Fatalf("failed to get retrieve: %v", err)
+	}
+
+	verifyMD5(t, good, list[0].Md5)
+	verifyMD5(t, bad, list[1].Md5)
+}
+
+// populate stages a set of artifacts with the given keys, each with
+// slightly different sizes and chucksizes.
+func populate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys []string, size int) []*pb.ArtifactMetadata {
+	scl := pb.NewArtifactStagingServiceClient(cc)
+
+	var artifacts []*pb.ArtifactMetadata
+	for i, key := range keys {
+		a := stage(ctx, scl, t, key, size+7*i, 97+i)
+		artifacts = append(artifacts, a)
+	}
+	if _, err := Commit(ctx, scl, artifacts); err != nil {
+		t.Fatalf("failed to commit manifest: %v", err)
+	}
+	return artifacts
+}
+
+// stage stages an artifact with the given key, size and chuck size. The content is
+// always 'z's.
+func stage(ctx context.Context, scl pb.ArtifactStagingServiceClient, t *testing.T, key string, size, chunkSize int) *pb.ArtifactMetadata {
+	data := make([]byte, size)
+	for i := 0; i < size; i++ {
+		data[i] = 'z'
+	}
+
+	md5W := md5.New()
+	md5W.Write(data)
+	hash := base64.StdEncoding.EncodeToString(md5W.Sum(nil))
+	md := makeArtifact(key, hash)
+
+	stream, err := scl.PutArtifact(ctx)
+	if err != nil {
+		t.Fatalf("put failed: %v", err)
+	}
+	header := &pb.PutArtifactRequest{
+		Content: &pb.PutArtifactRequest_Metadata{
+			Metadata: md,
+		},
+	}
+	if err := stream.Send(header); err != nil {
+		t.Fatalf("send header failed: %v", err)
+	}
+
+	for i := 0; i < size; i += chunkSize {
+		end := i + chunkSize
+		if size < end {
+			end = size
+		}
+
+		chunk := &pb.PutArtifactRequest{
+			Content: &pb.PutArtifactRequest_Data{
+				Data: &pb.ArtifactChunk{
+					Data: data[i:end],
+				},
+			},
+		}
+		if err := stream.Send(chunk); err != nil {
+			t.Fatalf("send chunk[%v:%v] failed: %v", i, end, err)
+		}
+	}
+	if _, err := stream.CloseAndRecv(); err != nil {
+		t.Fatalf("close failed: %v", err)
+	}
+	return md
+}
+
+func verifyMD5(t *testing.T, filename, hash string) {
+	actual, err := computeMD5(filename)
+	if err != nil {
+		t.Errorf("failed to compute hash for %v: %v", filename, err)
+		return
+	}
+	if actual != hash {
+		t.Errorf("file %v has bad MD5: %v, want %v", filename, actual, hash)
+	}
+}
+
+func makeTempDir(t *testing.T) string {
+	dir, err := ioutil.TempDir("", "artifact_test_")
+	if err != nil {
+		t.Errorf("Test failure: cannot create temporary directory: %+v", err)
+	}
+	return dir
+}
+
+func makeTempFiles(t *testing.T, dir string, keys []string, size int) []string {
+	var md5s []string
+	for i, key := range keys {
+		hash := makeTempFile(t, makeFilename(dir, key), size+i)
+		md5s = append(md5s, hash)
+	}
+	return md5s
+}
+
+func makeTempFile(t *testing.T, filename string, size int) string {
+	data := make([]byte, size)
+	for i := 0; i < size; i++ {
+		data[i] = 'z'
+	}
+
+	if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
+		t.Fatalf("cannot create directory for %s: %v", filename, err)
+	}
+	if err := ioutil.WriteFile(filename, data, 0644); err != nil {
+		t.Fatalf("cannot create file %s: %v", filename, err)
+	}
+
+	md5W := md5.New()
+	md5W.Write(data)
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil))
+}
+
+func makeArtifact(key, hash string) *pb.ArtifactMetadata {
+	return &pb.ArtifactMetadata{
+		Name:        key,
+		Md5:         hash,
+		Permissions: 0644,
+	}
+}
+
+func makeFilename(dir, key string) string {
+	return filepath.Join(dir, filepath.FromSlash(key))
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/server_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/server_test.go b/sdks/go/pkg/beam/artifact/server_test.go
new file mode 100644
index 0000000..c24e308
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/server_test.go
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// startServer starts an in-memory staging and retrieval artifact server
+// and returns a gRPC connection to it.
+func startServer(t *testing.T) *grpc.ClientConn {
+	// If port is zero this will bind an unused port.
+	listener, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("Failed to find unused port: %v", err)
+	}
+	endpoint := listener.Addr().String()
+
+	real := &server{m: make(map[string]*manifest)}
+
+	gs := grpc.NewServer()
+	pb.RegisterArtifactStagingServiceServer(gs, real)
+	pb.RegisterArtifactRetrievalServiceServer(gs, real)
+	go gs.Serve(listener)
+
+	t.Logf("server listening on %v", endpoint)
+
+	cc, err := grpc.Dial(endpoint, grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("failed to dial fake server at %v: %v", endpoint, err)
+	}
+	return cc
+}
+
+type data struct {
+	md     *pb.ArtifactMetadata
+	chunks [][]byte
+}
+
+type manifest struct {
+	md *pb.Manifest
+	m  map[string]*data // key -> data
+	mu sync.Mutex
+}
+
+// server is a in-memory staging and retrieval artifact server for testing.
+type server struct {
+	m  map[string]*manifest // token -> manifest
+	mu sync.Mutex
+}
+
+func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) error {
+	id, err := grpcx.ReadWorkerId(ps.Context())
+	if err != nil {
+		return fmt.Errorf("expected worker id: %v", err)
+	}
+
+	// Read header
+
+	header, err := ps.Recv()
+	if err != nil {
+		return fmt.Errorf("failed to receive header: %v", err)
+	}
+	if header.GetMetadata() == nil {
+		return fmt.Errorf("expected header as first message: %v", header)
+	}
+	key := header.GetMetadata().Name
+
+	// Read chunks
+
+	var chunks [][]byte
+	for {
+		msg, err := ps.Recv()
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return err
+		}
+
+		if msg.GetData() == nil {
+			return fmt.Errorf("expected data: %v", msg)
+		}
+		if len(msg.GetData().GetData()) == 0 {
+			return fmt.Errorf("expected non-empty data: %v", msg)
+		}
+		chunks = append(chunks, msg.GetData().GetData())
+	}
+
+	// Updated staged artifact. This test implementation will allow updates to artifacts
+	// that are already committed, but real implementations should manage artifacts in a
+	// way that makes that impossible.
+
+	m := s.getManifest(id, true)
+	m.mu.Lock()
+	m.m[key] = &data{chunks: chunks}
+	m.mu.Unlock()
+
+	return ps.SendAndClose(&pb.PutArtifactResponse{})
+}
+
+func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+	id, err := grpcx.ReadWorkerId(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, true)
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	// Verify that all artifacts are properly staged. Fail if not.
+
+	artifacts := req.GetManifest().GetArtifact()
+	for _, md := range artifacts {
+		if _, ok := m.m[md.Name]; !ok {
+			return nil, fmt.Errorf("artifact %v not staged", md.Name)
+		}
+	}
+
+	// Update commit. Only one manifest can exist for each staging id.
+
+	for _, md := range artifacts {
+		m.m[md.Name].md = md
+	}
+	m.md = req.GetManifest()
+
+	return &pb.CommitManifestResponse{StagingToken: id}, nil
+}
+
+func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
+	id, err := grpcx.ReadWorkerId(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, false)
+	if m == nil || m.md == nil {
+		return nil, fmt.Errorf("manifest for %v not found", id)
+	}
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	return &pb.GetManifestResponse{Manifest: m.md}, nil
+}
+
+func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
+	id, err := grpcx.ReadWorkerId(stream.Context())
+	if err != nil {
+		return fmt.Errorf("expected worker id: %v", err)
+	}
+
+	m := s.getManifest(id, false)
+	if m == nil || m.md == nil {
+		return fmt.Errorf("manifest for %v not found", id)
+	}
+
+	// Validate artifact and grab chunks so that we can stream them without
+	// holding the lock.
+
+	m.mu.Lock()
+	elm, ok := m.m[req.GetName()]
+	if !ok || elm.md == nil {
+		m.mu.Unlock()
+		return fmt.Errorf("manifest for %v does not contain artifact %v", id, req.GetName())
+	}
+	chunks := elm.chunks
+	m.mu.Unlock()
+
+	// Send chunks exactly as we received them.
+
+	for _, chunk := range chunks {
+		if err := stream.Send(&pb.ArtifactChunk{Data: chunk}); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *server) getManifest(id string, create bool) *manifest {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	ret, ok := s.m[id]
+	if !ok && create {
+		ret = &manifest{m: make(map[string]*data)}
+		s.m[id] = ret
+	}
+	return ret
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/stage.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/stage.go b/sdks/go/pkg/beam/artifact/stage.go
new file mode 100644
index 0000000..8d97079
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/stage.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"crypto/md5"
+	"encoding/base64"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"math/rand"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+)
+
+// Commit commits a manifest with the given staged artifacts. It returns the
+// staging token, if successful.
+func Commit(ctx context.Context, client pb.ArtifactStagingServiceClient, artifacts []*pb.ArtifactMetadata) (string, error) {
+	req := &pb.CommitManifestRequest{
+		Manifest: &pb.Manifest{
+			Artifact: artifacts,
+		},
+	}
+	resp, err := client.CommitManifest(ctx, req)
+	if err != nil {
+		return "", err
+	}
+	return resp.GetStagingToken(), nil
+}
+
+// StageDir stages a local directory with relative path keys. Convenience wrapper.
+func StageDir(ctx context.Context, client pb.ArtifactStagingServiceClient, src string) ([]*pb.ArtifactMetadata, error) {
+	list, err := scan(src)
+	if err != nil || len(list) == 0 {
+		return nil, err
+	}
+	return MultiStage(ctx, client, 10, list)
+}
+
+// MultiStage stages a set of local files with the given keys. It returns
+// the full artifact metadate.  It retries each artifact a few times.
+// Convenience wrapper.
+func MultiStage(ctx context.Context, client pb.ArtifactStagingServiceClient, cpus int, list []KeyedFile) ([]*pb.ArtifactMetadata, error) {
+	if cpus < 1 {
+		cpus = 1
+	}
+	if len(list) < cpus {
+		cpus = len(list)
+	}
+
+	q := make(chan KeyedFile, len(list))
+	for _, f := range list {
+		q <- f
+	}
+	close(q)
+	var permErr errorx.GuardedError
+
+	ret := make(chan *pb.ArtifactMetadata, len(list))
+
+	var wg sync.WaitGroup
+	for i := 0; i < cpus; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for f := range q {
+				if permErr.Error() != nil {
+					continue
+				}
+
+				const attempts = 3
+
+				var failures []string
+				for {
+					a, err := Stage(ctx, client, f.Key, f.Filename)
+					if err == nil {
+						ret <- a
+						break
+					}
+					if permErr.Error() != nil {
+						break // give up
+					}
+					failures = append(failures, err.Error())
+					if len(failures) > attempts {
+						permErr.TrySetError(fmt.Errorf("failed to stage %v in %v attempts: %v", f.Filename, attempts, strings.Join(failures, "; ")))
+						break // give up
+					}
+					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+	close(ret)
+
+	return queue2slice(ret), permErr.Error()
+}
+
+// Stage stages a local file as an artifact with the given key. It computes
+// the MD5 and returns the full artifact metadata.
+func Stage(ctx context.Context, client pb.ArtifactStagingServiceClient, key, filename string) (*pb.ArtifactMetadata, error) {
+	stat, err := os.Stat(filename)
+	if err != nil {
+		return nil, err
+	}
+	hash, err := computeMD5(filename)
+	if err != nil {
+		return nil, err
+	}
+	md := &pb.ArtifactMetadata{
+		Name:        key,
+		Permissions: uint32(stat.Mode()),
+		Md5:         hash,
+	}
+
+	fd, err := os.Open(filename)
+	if err != nil {
+		return nil, err
+	}
+	defer fd.Close()
+
+	stream, err := client.PutArtifact(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	header := &pb.PutArtifactRequest{
+		Content: &pb.PutArtifactRequest_Metadata{
+			Metadata: md,
+		},
+	}
+	if err := stream.Send(header); err != nil {
+		stream.CloseAndRecv() // ignore error
+		return nil, fmt.Errorf("failed to send header for %v: %v", filename, err)
+	}
+	stagedHash, err := stageChunks(stream, fd)
+	if err != nil {
+		stream.CloseAndRecv() // ignore error
+		return nil, fmt.Errorf("failed to send chunks for %v: %v", filename, err)
+	}
+	if _, err := stream.CloseAndRecv(); err != nil {
+		return nil, fmt.Errorf("failed to close stream for %v: %v", filename, err)
+	}
+	if hash != stagedHash {
+		return nil, fmt.Errorf("unexpected MD5 for sent chunks for %v: %v, want %v", filename, stagedHash, hash)
+	}
+	return md, nil
+}
+
+func stageChunks(stream pb.ArtifactStagingService_PutArtifactClient, r io.Reader) (string, error) {
+	md5W := md5.New()
+	data := make([]byte, 1<<20)
+	for {
+		n, err := r.Read(data)
+		if n > 0 {
+			if _, err := md5W.Write(data[:n]); err != nil {
+				panic(err) // cannot fail
+			}
+
+			chunk := &pb.PutArtifactRequest{
+				Content: &pb.PutArtifactRequest_Data{
+					Data: &pb.ArtifactChunk{
+						Data: data[:n],
+					},
+				},
+			}
+			if err := stream.Send(chunk); err != nil {
+				return "", fmt.Errorf("chunk send failed: %v", err)
+			}
+		}
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return "", err
+		}
+	}
+	return base64.StdEncoding.EncodeToString(md5W.Sum(nil)), nil
+}
+
+// KeyedFile is a key and filename pair.
+type KeyedFile struct {
+	Key, Filename string
+}
+
+func scan(dir string) ([]KeyedFile, error) {
+	var ret []KeyedFile
+	if err := walk(dir, "", &ret); err != nil {
+		return nil, fmt.Errorf("failed to scan %v for artifacts to stage: %v", dir, err)
+	}
+	return ret, nil
+}
+
+func walk(dir, key string, accum *[]KeyedFile) error {
+	list, err := ioutil.ReadDir(dir)
+	if err != nil {
+		return err
+	}
+
+	for _, elm := range list {
+		k := makeKey(key, elm.Name())
+		f := filepath.Join(dir, elm.Name())
+
+		if elm.IsDir() {
+			walk(f, k, accum)
+			continue
+		}
+		*accum = append(*accum, KeyedFile{k, f})
+	}
+	return nil
+}
+
+func makeKey(prefix, name string) string {
+	if prefix == "" {
+		return name
+	}
+	return path.Join(prefix, name)
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/artifact/stage_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/stage_test.go b/sdks/go/pkg/beam/artifact/stage_test.go
new file mode 100644
index 0000000..d1b32b6
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/stage_test.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+	"context"
+	"io/ioutil"
+	"os"
+	"testing"
+
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
+)
+
+// TestStage verifies that local files can be staged correctly.
+func TestStage(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+	client := pb.NewArtifactStagingServiceClient(cc)
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	keys := []string{"foo", "bar", "baz/baz/baz"}
+
+	src := makeTempDir(t)
+	defer os.RemoveAll(src)
+	md5s := makeTempFiles(t, src, keys, 300)
+
+	var artifacts []*pb.ArtifactMetadata
+	for _, key := range keys {
+		a, err := Stage(ctx, client, key, makeFilename(src, key))
+		if err != nil {
+			t.Errorf("failed to stage %v: %v", key, err)
+		}
+		artifacts = append(artifacts, a)
+	}
+	if _, err := Commit(ctx, client, artifacts); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	validate(ctx, cc, t, keys, md5s)
+}
+
+// TestStageDir validates that local files can be staged concurrently.
+func TestStageDir(t *testing.T) {
+	cc := startServer(t)
+	defer cc.Close()
+	client := pb.NewArtifactStagingServiceClient(cc)
+
+	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
+
+	src := makeTempDir(t)
+	defer os.RemoveAll(src)
+	md5s := makeTempFiles(t, src, keys, 300)
+
+	artifacts, err := StageDir(ctx, client, src)
+	if err != nil {
+		t.Errorf("failed to stage dir %v: %v", src, err)
+	}
+	if _, err := Commit(ctx, client, artifacts); err != nil {
+		t.Fatalf("failed to commit: %v", err)
+	}
+
+	validate(ctx, cc, t, keys, md5s)
+}
+
+func validate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys, md5s []string) {
+	rcl := pb.NewArtifactRetrievalServiceClient(cc)
+
+	for i, key := range keys {
+		stream, err := rcl.GetArtifact(ctx, &pb.GetArtifactRequest{Name: key})
+		if err != nil {
+			t.Fatalf("failed to get artifact for %v: %v", key, err)
+		}
+
+		hash, err := retrieveChunks(stream, ioutil.Discard)
+		if err != nil {
+			t.Fatalf("failed to get chunks for %v: %v", key, err)
+		}
+		if hash != md5s[i] {
+			t.Errorf("incorrect MD5: %v, want %v", hash, md5s[i])
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c18f15cd/sdks/go/pkg/beam/model/gen.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/gen.go b/sdks/go/pkg/beam/model/gen.go
new file mode 100644
index 0000000..9bff5e0
--- /dev/null
+++ b/sdks/go/pkg/beam/model/gen.go
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package model
+
+// TODO(herohde) 9/1/2017: for now, install protoc as described on grpc.io before running go generate.
+
+//go:generate protoc -I../../../../common/runner-api/src/main/proto ../../../../common/runner-api/src/main/proto/beam_artifact_api.proto --go_out=org_apache_beam_runner_v1,plugins=grpc:org_apache_beam_runner_v1
+//go:generate protoc -I../../../../common/fn-api/src/main/proto ../../../../common/fn-api/src/main/proto/beam_provision_api.proto --go_out=org_apache_beam_fn_v1,plugins=grpc:org_apache_beam_fn_v1