You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/12/21 23:05:35 UTC
[beam] branch master updated: [BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 45df0ac6 [BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)
45df0ac6 is described below
commit 45df0ac6a3cf28e5b8ebf8a6899e3640c520f09d
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Tue Dec 21 18:04:32 2021 -0500
[BEAM-13399] Add integration test for Go SDK expansion service JAR pull/start-up (#16313)
---
.../beam/core/runtime/xlangx/expansionx/process.go | 4 ++
sdks/go/test/integration/xlang/expansion_test.go | 68 ++++++++++++++++++++++
2 files changed, 72 insertions(+)
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
index 935aeff..fa94835 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
@@ -18,6 +18,7 @@ package expansionx
import (
"fmt"
"os/exec"
+ "time"
)
// ExpansionServiceRunner is a type that holds information required to
@@ -51,6 +52,9 @@ func (e *ExpansionServiceRunner) StartService() error {
if err != nil {
return err
}
+ // Start() is non-blocking so a brief sleep to let the JAR start up and begin accepting
+ // connections is necessary.
+ time.Sleep(2 * time.Second)
if e.serviceCommand.ProcessState != nil {
return fmt.Errorf("process %v exited when it should still be running", e.serviceCommand.Process)
}
diff --git a/sdks/go/test/integration/xlang/expansion_test.go b/sdks/go/test/integration/xlang/expansion_test.go
new file mode 100644
index 0000000..070fccf
--- /dev/null
+++ b/sdks/go/test/integration/xlang/expansion_test.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlang
+
+import (
+ "os"
+ "os/exec"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
+ "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+const (
+ // TODO(BEAN-13505): Select the most recent Beam release instead of a hard-coded
+ // string.
+ beamVersion = "2.34.0"
+ gradleTarget = ":sdks:java:io:expansion-service:runExpansionService"
+ expansionPort = "8097"
+)
+
+func checkPort(t *testing.T, port string) {
+ ping := exec.Command("nc", "-vz", "localhost", port)
+ output, err := ping.CombinedOutput()
+ if err != nil {
+ t.Errorf("failed to run ping to localhost:%v", port)
+ }
+ outputStr := string(output)
+ if strings.Contains(outputStr, "failed") {
+ t.Errorf("failed to connect to localhost:%v, got err %v", port, outputStr)
+ }
+}
+
+func TestAutomatedExpansionService(t *testing.T) {
+ integration.CheckFilters(t)
+ jarPath, err := expansionx.GetBeamJar(gradleTarget, beamVersion)
+ if err != nil {
+ t.Fatalf("failed to get JAR path, got %v", err)
+ }
+ t.Cleanup(func() { os.Remove(jarPath) })
+
+ serviceRunner := expansionx.NewExpansionServiceRunner(jarPath, expansionPort)
+ err = serviceRunner.StartService()
+ if err != nil {
+ t.Errorf("failed to start expansion service JAR, got %v", err)
+ }
+
+ checkPort(t, expansionPort)
+
+ err = serviceRunner.StopService()
+ if err != nil {
+ t.Errorf("failed to stop expansion service JAR, got %v", err)
+ }
+}