You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/06 21:18:45 UTC

[2/5] beam git commit: [BEAM-2877] Minor Go fixes for typos, proto changes and beamctl structure

[BEAM-2877] Minor Go fixes for typos, proto changes and beamctl structure


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4085f93
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4085f93
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4085f93

Branch: refs/heads/master
Commit: c4085f9389dd44e1d0db8ccded5bdd85e2c3e8fa
Parents: d8f71ff
Author: Henning Rohde <he...@google.com>
Authored: Fri Oct 6 13:29:11 2017 -0700
Committer: Henning Rohde <he...@google.com>
Committed: Fri Oct 6 13:30:44 2017 -0700

----------------------------------------------------------------------
 sdks/go/cmd/beamctl/artifact.go                 | 98 --------------------
 sdks/go/cmd/beamctl/cmd/artifact.go             | 98 ++++++++++++++++++++
 sdks/go/cmd/beamctl/cmd/root.go                 | 55 +++++++++++
 sdks/go/cmd/beamctl/main.go                     | 37 +-------
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go   |  2 +-
 sdks/go/pkg/beam/artifact/materialize_test.go   |  6 +-
 sdks/go/pkg/beam/artifact/server_test.go        |  8 +-
 sdks/go/pkg/beam/artifact/stage_test.go         |  4 +-
 .../beam_artifact_api.pb.go                     |  4 +-
 sdks/go/pkg/beam/util/grpcx/metadata.go         |  4 +-
 sdks/go/pom.xml                                 |  6 +-
 sdks/java/container/boot.go                     |  6 +-
 12 files changed, 175 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/cmd/beamctl/artifact.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/artifact.go b/sdks/go/cmd/beamctl/artifact.go
deleted file mode 100644
index d8c2c37..0000000
--- a/sdks/go/cmd/beamctl/artifact.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
-	"path/filepath"
-
-	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
-	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
-	"github.com/spf13/cobra"
-)
-
-var (
-	artifactCmd = &cobra.Command{
-		Use:   "artifact",
-		Short: "Artifact commands",
-	}
-
-	stageCmd = &cobra.Command{
-		Use:   "stage",
-		Short: "Stage local files as artifacts",
-		RunE:  stageFn,
-		Args:  cobra.MinimumNArgs(1),
-	}
-
-	listCmd = &cobra.Command{
-		Use:   "list",
-		Short: "List artifacts",
-		RunE:  listFn,
-		Args:  cobra.NoArgs,
-	}
-)
-
-func init() {
-	artifactCmd.AddCommand(stageCmd, listCmd)
-}
-
-func stageFn(cmd *cobra.Command, args []string) error {
-	ctx, cc, err := dial()
-	if err != nil {
-		return err
-	}
-	defer cc.Close()
-
-	// (1) Use flat filename as key.
-
-	var files []artifact.KeyedFile
-	for _, arg := range args {
-		files = append(files, artifact.KeyedFile{Key: filepath.Base(arg), Filename: arg})
-	}
-
-	// (2) Stage files in parallel, commit and print out token
-
-	client := pb.NewArtifactStagingServiceClient(cc)
-	list, err := artifact.MultiStage(ctx, client, 10, files)
-	if err != nil {
-		return err
-	}
-	token, err := artifact.Commit(ctx, client, list)
-	if err != nil {
-		return err
-	}
-
-	cmd.Println(token)
-	return nil
-}
-
-func listFn(cmd *cobra.Command, args []string) error {
-	ctx, cc, err := dial()
-	if err != nil {
-		return err
-	}
-	defer cc.Close()
-
-	client := pb.NewArtifactRetrievalServiceClient(cc)
-	md, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
-	if err != nil {
-		return err
-	}
-
-	for _, a := range md.GetManifest().GetArtifact() {
-		cmd.Println(a.Name)
-	}
-	return nil
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/cmd/beamctl/cmd/artifact.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/cmd/artifact.go b/sdks/go/cmd/beamctl/cmd/artifact.go
new file mode 100644
index 0000000..9898282
--- /dev/null
+++ b/sdks/go/cmd/beamctl/cmd/artifact.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cmd
+
+import (
+	"path/filepath"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/org_apache_beam_runner_v1"
+	"github.com/spf13/cobra"
+)
+
+var (
+	artifactCmd = &cobra.Command{
+		Use:   "artifact",
+		Short: "Artifact commands",
+	}
+
+	stageCmd = &cobra.Command{
+		Use:   "stage",
+		Short: "Stage local files as artifacts",
+		RunE:  stageFn,
+		Args:  cobra.MinimumNArgs(1),
+	}
+
+	listCmd = &cobra.Command{
+		Use:   "list",
+		Short: "List artifacts",
+		RunE:  listFn,
+		Args:  cobra.NoArgs,
+	}
+)
+
+func init() {
+	artifactCmd.AddCommand(stageCmd, listCmd)
+}
+
+func stageFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	// (1) Use flat filename as key.
+
+	var files []artifact.KeyedFile
+	for _, arg := range args {
+		files = append(files, artifact.KeyedFile{Key: filepath.Base(arg), Filename: arg})
+	}
+
+	// (2) Stage files in parallel, commit and print out token
+
+	client := pb.NewArtifactStagingServiceClient(cc)
+	list, err := artifact.MultiStage(ctx, client, 10, files)
+	if err != nil {
+		return err
+	}
+	token, err := artifact.Commit(ctx, client, list)
+	if err != nil {
+		return err
+	}
+
+	cmd.Println(token)
+	return nil
+}
+
+func listFn(cmd *cobra.Command, args []string) error {
+	ctx, cc, err := dial()
+	if err != nil {
+		return err
+	}
+	defer cc.Close()
+
+	client := pb.NewArtifactRetrievalServiceClient(cc)
+	md, err := client.GetManifest(ctx, &pb.GetManifestRequest{})
+	if err != nil {
+		return err
+	}
+
+	for _, a := range md.GetManifest().GetArtifact() {
+		cmd.Println(a.Name)
+	}
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/cmd/beamctl/cmd/root.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/cmd/root.go b/sdks/go/cmd/beamctl/cmd/root.go
new file mode 100644
index 0000000..53ee83c
--- /dev/null
+++ b/sdks/go/cmd/beamctl/cmd/root.go
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package cmd contains the commands for beamctl.
+package cmd
+
+import (
+	"context"
+	"errors"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/spf13/cobra"
+	"google.golang.org/grpc"
+)
+
+var (
+	RootCmd = &cobra.Command{
+		Use:   "beamctl",
+		Short: "Apache Beam command line client",
+	}
+
+	id       string
+	endpoint string
+)
+
+func init() {
+	RootCmd.AddCommand(artifactCmd)
+	RootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "Server endpoint, such as localhost:123")
+	RootCmd.PersistentFlags().StringVarP(&id, "id", "i", "", "Client ID")
+}
+
+// dial connects via gRPC to the given endpoint and returns the connection
+// and the context to use.
+func dial() (context.Context, *grpc.ClientConn, error) {
+	if endpoint == "" {
+		return nil, nil, errors.New("endpoint not defined")
+	}
+
+	ctx := grpcx.WriteWorkerID(context.Background(), id)
+	cc, err := grpcx.Dial(ctx, endpoint, time.Minute)
+	return ctx, cc, err
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/cmd/beamctl/main.go
----------------------------------------------------------------------
diff --git a/sdks/go/cmd/beamctl/main.go b/sdks/go/cmd/beamctl/main.go
index 9ce47a7..7d6ae8a 100644
--- a/sdks/go/cmd/beamctl/main.go
+++ b/sdks/go/cmd/beamctl/main.go
@@ -17,48 +17,15 @@
 package main
 
 import (
-	"context"
-	"errors"
 	"fmt"
 	"os"
-	"time"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
-	"github.com/spf13/cobra"
-	"google.golang.org/grpc"
+	"github.com/apache/beam/sdks/go/cmd/beamctl/cmd"
 )
 
-var (
-	rootCmd = &cobra.Command{
-		Use:   "beamctl",
-		Short: "Apache Beam command line client",
-	}
-
-	id       string
-	endpoint string
-)
-
-func init() {
-	rootCmd.AddCommand(artifactCmd)
-	rootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "Server endpoint, such as localhost:123")
-	rootCmd.PersistentFlags().StringVarP(&id, "id", "i", "", "Client ID")
-}
-
 func main() {
-	if err := rootCmd.Execute(); err != nil {
+	if err := cmd.RootCmd.Execute(); err != nil {
 		fmt.Println(err)
 		os.Exit(1)
 	}
 }
-
-// dial connects via gRPC to the given endpoint and returns the connection
-// and the context to use.
-func dial() (context.Context, *grpc.ClientConn, error) {
-	if endpoint == "" {
-		return nil, nil, errors.New("endpoint not defined")
-	}
-
-	ctx := grpcx.WriteWorkerId(context.Background(), id)
-	cc, err := grpcx.Dial(ctx, endpoint, time.Minute)
-	return ctx, cc, err
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index 3c67b1a..c751d36 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -33,7 +33,7 @@ import (
 	"google.golang.org/api/storage/v1"
 )
 
-// StaginServer is a artifact staging server backed by Google Cloud Storage
+// StagingServer is a artifact staging server backed by Google Cloud Storage
 // (GCS). It commits a single manifest and ignores the staging id.
 type StagingServer struct {
 	manifest     string

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/artifact/materialize_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go
index 5d35512..37f6c22 100644
--- a/sdks/go/pkg/beam/artifact/materialize_test.go
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -34,7 +34,7 @@ func TestRetrieve(t *testing.T) {
 	cc := startServer(t)
 	defer cc.Close()
 
-	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	ctx := grpcx.WriteWorkerID(context.Background(), "idA")
 	keys := []string{"foo", "bar", "baz/baz/baz"}
 	artifacts := populate(ctx, cc, t, keys, 300)
 
@@ -58,7 +58,7 @@ func TestMultiRetrieve(t *testing.T) {
 	cc := startServer(t)
 	defer cc.Close()
 
-	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	ctx := grpcx.WriteWorkerID(context.Background(), "idB")
 	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
 	artifacts := populate(ctx, cc, t, keys, 300)
 
@@ -81,7 +81,7 @@ func TestDirtyRetrieve(t *testing.T) {
 	cc := startServer(t)
 	defer cc.Close()
 
-	ctx := grpcx.WriteWorkerId(context.Background(), "idC")
+	ctx := grpcx.WriteWorkerID(context.Background(), "idC")
 	scl := pb.NewArtifactStagingServiceClient(cc)
 
 	list := []*pb.ArtifactMetadata{

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/artifact/server_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/server_test.go b/sdks/go/pkg/beam/artifact/server_test.go
index c24e308..d5d5496 100644
--- a/sdks/go/pkg/beam/artifact/server_test.go
+++ b/sdks/go/pkg/beam/artifact/server_test.go
@@ -72,7 +72,7 @@ type server struct {
 }
 
 func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) error {
-	id, err := grpcx.ReadWorkerId(ps.Context())
+	id, err := grpcx.ReadWorkerID(ps.Context())
 	if err != nil {
 		return fmt.Errorf("expected worker id: %v", err)
 	}
@@ -122,7 +122,7 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
 }
 
 func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
-	id, err := grpcx.ReadWorkerId(ctx)
+	id, err := grpcx.ReadWorkerID(ctx)
 	if err != nil {
 		return nil, fmt.Errorf("expected worker id: %v", err)
 	}
@@ -151,7 +151,7 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
 }
 
 func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
-	id, err := grpcx.ReadWorkerId(ctx)
+	id, err := grpcx.ReadWorkerID(ctx)
 	if err != nil {
 		return nil, fmt.Errorf("expected worker id: %v", err)
 	}
@@ -167,7 +167,7 @@ func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*
 }
 
 func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
-	id, err := grpcx.ReadWorkerId(stream.Context())
+	id, err := grpcx.ReadWorkerID(stream.Context())
 	if err != nil {
 		return fmt.Errorf("expected worker id: %v", err)
 	}

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/artifact/stage_test.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/artifact/stage_test.go b/sdks/go/pkg/beam/artifact/stage_test.go
index d1b32b6..a371443 100644
--- a/sdks/go/pkg/beam/artifact/stage_test.go
+++ b/sdks/go/pkg/beam/artifact/stage_test.go
@@ -32,7 +32,7 @@ func TestStage(t *testing.T) {
 	defer cc.Close()
 	client := pb.NewArtifactStagingServiceClient(cc)
 
-	ctx := grpcx.WriteWorkerId(context.Background(), "idA")
+	ctx := grpcx.WriteWorkerID(context.Background(), "idA")
 	keys := []string{"foo", "bar", "baz/baz/baz"}
 
 	src := makeTempDir(t)
@@ -60,7 +60,7 @@ func TestStageDir(t *testing.T) {
 	defer cc.Close()
 	client := pb.NewArtifactStagingServiceClient(cc)
 
-	ctx := grpcx.WriteWorkerId(context.Background(), "idB")
+	ctx := grpcx.WriteWorkerID(context.Background(), "idB")
 	keys := []string{"1", "2", "3", "4", "a/5", "a/6", "a/7", "a/8", "a/a/9", "a/a/10", "a/b/11", "a/b/12"}
 
 	src := makeTempDir(t)

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go b/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
index e8cc800..62c29b5 100644
--- a/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
+++ b/sdks/go/pkg/beam/model/org_apache_beam_runner_v1/beam_artifact_api.pb.go
@@ -63,8 +63,8 @@ type ArtifactMetadata struct {
 	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
 	// (Optional) The Unix-like permissions of the artifact
 	Permissions uint32 `protobuf:"varint,2,opt,name=permissions" json:"permissions,omitempty"`
-	// (Optional) The md5 checksum of the artifact. Used, among other things, by harness boot code to
-	// validate the integrity of the artifact.
+	// (Optional) The base64-encoded md5 checksum of the artifact. Used, among other things, by
+	// harness boot code to validate the integrity of the artifact.
 	Md5 string `protobuf:"bytes,3,opt,name=md5" json:"md5,omitempty"`
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pkg/beam/util/grpcx/metadata.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/util/grpcx/metadata.go b/sdks/go/pkg/beam/util/grpcx/metadata.go
index 08512c7..eed51ed 100644
--- a/sdks/go/pkg/beam/util/grpcx/metadata.go
+++ b/sdks/go/pkg/beam/util/grpcx/metadata.go
@@ -27,7 +27,7 @@ import (
 const idKey = "id"
 
 // ReadWorkerID reads the worker ID from an incoming gRPC request context.
-func ReadWorkerId(ctx context.Context) (string, error) {
+func ReadWorkerID(ctx context.Context) (string, error) {
 	md, ok := metadata.FromIncomingContext(ctx)
 	if !ok {
 		return "", errors.New("failed to read metadata from context")
@@ -44,7 +44,7 @@ func ReadWorkerId(ctx context.Context) (string, error) {
 
 // WriteWorkerID write the worker ID to an outgoing gRPC request context. It
 // merges the information with any existing gRPC metadata.
-func WriteWorkerId(ctx context.Context, id string) context.Context {
+func WriteWorkerID(ctx context.Context, id string) context.Context {
 	md := metadata.New(map[string]string{
 		idKey: id,
 	})

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/go/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/go/pom.xml b/sdks/go/pom.xml
index c072b9f..207c53d 100644
--- a/sdks/go/pom.xml
+++ b/sdks/go/pom.xml
@@ -66,7 +66,7 @@
               <goal>copy-resources</goal>
             </goals>
             <configuration>
-              <outputDirectory>${go.source.base}/github.com/apache/beam/cmd</outputDirectory>
+              <outputDirectory>${go.source.dir}/cmd</outputDirectory>
               <resources>
                 <resource>
                   <directory>cmd</directory>
@@ -124,7 +124,7 @@
             <phase>compile</phase>
             <configuration>
               <packages>
-                <package>github.com/apache/beam/cmd/beamctl</package>
+                <package>github.com/apache/beam/sdks/go/cmd/beamctl</package>
               </packages>
               <resultName>beamctl</resultName>
             </configuration>
@@ -137,7 +137,7 @@
             <phase>compile</phase>
             <configuration>
               <packages>
-                <package>github.com/apache/beam/cmd/beamctl</package>
+                <package>github.com/apache/beam/sdks/go/cmd/beamctl</package>
               </packages>
               <resultName>linux_amd64/beamctl</resultName>
               <targetArch>amd64</targetArch>

http://git-wip-us.apache.org/repos/asf/beam/blob/c4085f93/sdks/java/container/boot.go
----------------------------------------------------------------------
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 8c465c3..2e140a1 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -63,7 +63,7 @@ func main() {
 
 	log.Printf("Initializing java harness: %v", strings.Join(os.Args, " "))
 
-	ctx := grpcx.WriteWorkerId(context.Background(), *id)
+	ctx := grpcx.WriteWorkerID(context.Background(), *id)
 
 	// (1) Obtain the pipeline options
 
@@ -88,8 +88,8 @@ func main() {
 	// (3) Invoke the Java harness, preserving artifact ordering in classpath.
 
 	os.Setenv("PIPELINE_OPTIONS", options)
-	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", fmt.Sprintf("id: \"1\"\nurl: \"%v\"\n", *loggingEndpoint))
-	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", fmt.Sprintf("id: \"2\"\nurl: \"%v\"\n", *controlEndpoint))
+	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *loggingEndpoint))
+	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", fmt.Sprintf("url: \"%v\"\n", *controlEndpoint))
 
 	const jarsDir = "/opt/apache/beam/jars"
 	cp := []string{