You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/11/18 19:44:52 UTC

[GitHub] [beam] lostluck commented on a change in pull request #13362: [BEAM-11075] Pardo tests for Go SDK

lostluck commented on a change in pull request #13362:
URL: https://github.com/apache/beam/pull/13362#discussion_r526322639



##########
File path: .test-infra/jenkins/CommonTestProperties.groovy
##########
@@ -21,17 +21,36 @@
 class CommonTestProperties {
   enum SDK {
     PYTHON,
-    JAVA
+    JAVA,
+    GO,
   }
 
   enum Runner {
-    DATAFLOW("DataflowRunner"),
-    TEST_DATAFLOW("TestDataflowRunner"),
-    SPARK("SparkRunner"),
-    SPARK_STRUCTURED_STREAMING("SparkStructuredStreamingRunner"),
-    FLINK("FlinkRunner"),
-    DIRECT("DirectRunner"),
-    PORTABLE("PortableRunner")
+    DATAFLOW,
+    TEST_DATAFLOW,
+    SPARK,
+    SPARK_STRUCTURED_STREAMING,
+    FLINK,
+    DIRECT,
+    PORTABLE,
+
+    def runnerNames = [
+      DATAFLOW: "DataflowRunner",
+      TEST_DATAFLOW: "TestDataflowRunner",
+      SPARK: "SparkRunner",
+      SPARK_STRUCTURED_STREAMING: "SparkStructuredStreamingRunner",
+      FLINK: "FlinkRunner",
+      DIRECT: "DirectRunner",
+      PORTABLE: "PortableRunner",
+    ]
+
+    def goRunnerNames = [
+      DATAFLOW: "dataflow",
+      SPARK: "spark",
+      FLINK: "flink",
+      DIRECT: "direct",
+      PORTABLE: "universal",

Review comment:
       No need to do it now, but for compatibility and infrastructure convenience there's no reason we can't re-register the various execution functions with additional aliases that match Java and Python which appear to require full class names.
   
   That is, there's no reason why we can't have a package that imports all the runners and adds the aliases for these load tests (or for the runner files to do it themselves for common use).
   
   ```
   // if done in a separate package
   func init() {
     beam.RegisterRunner("SparkRunner", spark.Execute)
   }
   ```
   
   ```
   // if done in the spark runner package
   func init() {
     beam.RegisterRunner("spark", Execute)
     beam.RegisterRunner("SparkRunner", Execute)
   }
   ```

##########
File path: sdks/go/test/load/util.go
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package load
+
+import (
+	"context"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"os"
+	"reflect"
+	"strings"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+const (
+	runtimeMetricNamespace = "RuntimeMonitor"
+	runtimeMetricName      = "runtime"
+)
+
+var (
+	influxMeasurement = flag.String(
+		"influx_measurement",
+		"",
+		`An InfluxDB measurement where metrics should be published to.
+		If empty, no metrics will be send to InfluxDB.`)
+	influxDatabase = flag.String(
+		"influx_db_name",
+		"",
+		"InfluxDB database name. If empty, no metrics will be send to InfluxDB.")
+	influxHost = flag.String(
+		"influx_hostname",
+		"http://localhost:8086",
+		"Hostname and port to connect to InfluxDB. Defaults to http://localhost:8086.")
+	influxNamespace = flag.String(
+		"influx_namespace",
+		"",
+		`A namespace to be used when constructing InfluxDB's data points.
+		Used to make some points different from others within the same measurement.`)
+	runtime = beam.NewDistribution(runtimeMetricNamespace, runtimeMetricName)
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
+}
+
+// RuntimeMonitor is a DoFn to record processing time in the pipeline.
+//
+// It uses a distribution metric which is updated every time new bundle
+// starts or finishes. The processing time can be extracted by calculating
+// the difference of the maximum and the minimum value of the distribution
+// metric.
+type RuntimeMonitor struct{}
+
+// StartBundle updates a distribution metric.
+func (fn *RuntimeMonitor) StartBundle(ctx context.Context, emit func([]byte, []byte)) {
+	runtime.Update(ctx, time.Now().Unix())

Review comment:
       Note, Unix only provides second granularity. If seconds is too coarse (or doesn't match Java and Python), consider using UnixNano() and converting to the correct granularity (millis or micros).

##########
File path: sdks/go/test/load/util.go
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package load
+
+import (
+	"context"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"os"
+	"reflect"
+	"strings"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+const (
+	runtimeMetricNamespace = "RuntimeMonitor"
+	runtimeMetricName      = "runtime"
+)
+
+var (
+	influxMeasurement = flag.String(
+		"influx_measurement",
+		"",
+		`An InfluxDB measurement where metrics should be published to.
+		If empty, no metrics will be send to InfluxDB.`)
+	influxDatabase = flag.String(
+		"influx_db_name",
+		"",
+		"InfluxDB database name. If empty, no metrics will be send to InfluxDB.")
+	influxHost = flag.String(
+		"influx_hostname",
+		"http://localhost:8086",
+		"Hostname and port to connect to InfluxDB. Defaults to http://localhost:8086.")
+	influxNamespace = flag.String(
+		"influx_namespace",
+		"",
+		`A namespace to be used when constructing InfluxDB's data points.
+		Used to make some points different from others within the same measurement.`)
+	runtime = beam.NewDistribution(runtimeMetricNamespace, runtimeMetricName)
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
+}
+
+// RuntimeMonitor is a DoFn to record processing time in the pipeline.
+//
+// It uses a distribution metric which is updated every time new bundle

Review comment:
       ```suggestion
   // It uses a distribution metric which is updated every time a new bundle
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org