You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/12/21 23:05:35 UTC

[beam] branch master updated: [BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 45df0ac6 [BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)
45df0ac6 is described below

commit 45df0ac6a3cf28e5b8ebf8a6899e3640c520f09d
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Tue Dec 21 18:04:32 2021 -0500

    [BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)
---
 .../beam/core/runtime/xlangx/expansionx/process.go |  4 ++
 sdks/go/test/integration/xlang/expansion_test.go   | 68 ++++++++++++++++++++++
 2 files changed, 72 insertions(+)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
index 935aeff..fa94835 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
@@ -18,6 +18,7 @@ package expansionx
 import (
 	"fmt"
 	"os/exec"
+	"time"
 )
 
 // ExpansionServiceRunner is a type that holds information required to
@@ -51,6 +52,9 @@ func (e *ExpansionServiceRunner) StartService() error {
 	if err != nil {
 		return err
 	}
+	// Start() is non-blocking so a brief sleep to let the JAR start up and begin accepting
+	// connections is necessary.
+	time.Sleep(2 * time.Second)
 	if e.serviceCommand.ProcessState != nil {
 		return fmt.Errorf("process %v exited when it should still be running", e.serviceCommand.Process)
 	}
diff --git a/sdks/go/test/integration/xlang/expansion_test.go b/sdks/go/test/integration/xlang/expansion_test.go
new file mode 100644
index 0000000..070fccf
--- /dev/null
+++ b/sdks/go/test/integration/xlang/expansion_test.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlang
+
+import (
+	"os"
+	"os/exec"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
+	"github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+const (
+	// TODO(BEAN-13505): Select the most recent Beam release instead of a hard-coded
+	// string.
+	beamVersion   = "2.34.0"
+	gradleTarget  = ":sdks:java:io:expansion-service:runExpansionService"
+	expansionPort = "8097"
+)
+
+func checkPort(t *testing.T, port string) {
+	ping := exec.Command("nc", "-vz", "localhost", port)
+	output, err := ping.CombinedOutput()
+	if err != nil {
+		t.Errorf("failed to run ping to localhost:%v", port)
+	}
+	outputStr := string(output)
+	if strings.Contains(outputStr, "failed") {
+		t.Errorf("failed to connect to localhost:%v, got err %v", port, outputStr)
+	}
+}
+
+func TestAutomatedExpansionService(t *testing.T) {
+	integration.CheckFilters(t)
+	jarPath, err := expansionx.GetBeamJar(gradleTarget, beamVersion)
+	if err != nil {
+		t.Fatalf("failed to get JAR path, got %v", err)
+	}
+	t.Cleanup(func() { os.Remove(jarPath) })
+
+	serviceRunner := expansionx.NewExpansionServiceRunner(jarPath, expansionPort)
+	err = serviceRunner.StartService()
+	if err != nil {
+		t.Errorf("failed to start expansion service JAR, got %v", err)
+	}
+
+	checkPort(t, expansionPort)
+
+	err = serviceRunner.StopService()
+	if err != nil {
+		t.Errorf("failed to stop expansion service JAR, got %v", err)
+	}
+}