You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/26 18:40:44 UTC
[1/3] beam git commit: Add missing @RunWith to test.
Repository: beam
Updated Branches:
refs/heads/master cde3b7b2d -> e01c78da7
Add missing @RunWith to test.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77d00584
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77d00584
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77d00584
Branch: refs/heads/master
Commit: 77d00584a7b18c1442a190283133202c4bb33916
Parents: e5290f1
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 19 15:04:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 26 11:39:53 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/ArtifactServiceStagerTest.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/77d00584/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
index 13bd8dd..ffd023e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
@@ -45,10 +45,13 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
/**
* Tests for {@link ArtifactServiceStager}.
*/
+@RunWith(JUnit4.class)
public class ArtifactServiceStagerTest {
@Rule public TemporaryFolder temp = new TemporaryFolder();
[3/3] beam git commit: [BEAM-2566] Decouple SDK harness from Dataflow
runner by elevating experiments and SDK harness configuration to
java-sdk-core.
Posted by lc...@apache.org.
[BEAM-2566] Decouple SDK harness from Dataflow runner by elevating experiments and SDK harness configuration to java-sdk-core.
This closes #4016
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e01c78da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e01c78da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e01c78da
Branch: refs/heads/master
Commit: e01c78da7d713a23910c4bfb27d4177f6247098a
Parents: cde3b7b 77d0058
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 26 11:40:32 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 26 11:40:32 2017 -0700
----------------------------------------------------------------------
.../construction/ArtifactServiceStagerTest.java | 3 +
.../options/DataflowPipelineDebugOptions.java | 23 +--
.../options/DataflowWorkerLoggingOptions.java | 6 +
.../DefaultPipelineOptionsRegistrar.java | 2 +
.../beam/sdk/options/ExperimentalOptions.java | 38 ++++
.../beam/sdk/options/SdkHarnessOptions.java | 173 +++++++++++++++++++
.../beam/sdk/options/SdkHarnessOptionsTest.java | 76 ++++++++
sdks/java/harness/pom.xml | 6 -
.../harness/channel/ManagedChannelFactory.java | 4 +-
.../BeamFnDataBufferingOutboundObserver.java | 4 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 30 ++--
.../harness/stream/StreamObserverFactory.java | 4 +-
.../logging/BeamFnLoggingClientTest.java | 12 +-
13 files changed, 327 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-2566] Decouple SDK harness from Dataflow
runner by elevating experiments and SDK harness configuration to
java-sdk-core.
Posted by lc...@apache.org.
[BEAM-2566] Decouple SDK harness from Dataflow runner by elevating experiments and SDK harness configuration to java-sdk-core.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5290f1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5290f1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5290f1a
Branch: refs/heads/master
Commit: e5290f1ad82192e85d4e7a838d94ee771d9e4f7b
Parents: cde3b7b
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 19 11:26:04 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 26 11:39:53 2017 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 23 +--
.../options/DataflowWorkerLoggingOptions.java | 6 +
.../DefaultPipelineOptionsRegistrar.java | 2 +
.../beam/sdk/options/ExperimentalOptions.java | 38 ++++
.../beam/sdk/options/SdkHarnessOptions.java | 173 +++++++++++++++++++
.../beam/sdk/options/SdkHarnessOptionsTest.java | 76 ++++++++
sdks/java/harness/pom.xml | 6 -
.../harness/channel/ManagedChannelFactory.java | 4 +-
.../BeamFnDataBufferingOutboundObserver.java | 4 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 30 ++--
.../harness/stream/StreamObserverFactory.java | 4 +-
.../logging/BeamFnLoggingClientTest.java | 12 +-
12 files changed, 324 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index d0ea722..ec108da 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -19,16 +19,14 @@ package org.apache.beam.runners.dataflow.options;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.api.services.dataflow.Dataflow;
-import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
-import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -40,24 +38,7 @@ import org.apache.beam.sdk.util.InstanceBuilder;
@Description("[Internal] Options used to control execution of the Dataflow SDK for "
+ "debugging and testing purposes.")
@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
- /**
- * The list of backend experiments to enable.
- *
- * <p>Dataflow provides a number of experimental features that can be enabled
- * with this flag.
- *
- * <p>Please sync with the Dataflow team before enabling any experiments.
- */
- @Description("[Experimental] Dataflow provides a number of experimental features that can "
- + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
- + "experiments.")
- @Experimental
- @Nullable
- List<String> getExperiments();
- void setExperiments(@Nullable List<String> value);
-
+public interface DataflowPipelineDebugOptions extends ExperimentalOptions, PipelineOptions {
/**
* The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
* if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
index fae851c..a419b76 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -29,8 +29,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
/**
* Options that are used to control logging configuration on the Dataflow worker.
+ *
+ * @deprecated This interface will no longer be the source of truth for worker logging configuration
+ * once jobs are executed using a dedicated SDK harness instead of user code being co-located
+ * alongside Dataflow worker code. Please set the option below and also the corresponding option
+ * within {@link org.apache.beam.sdk.options.SdkHarnessOptions} to ensure forward compatibility.
*/
@Description("Options that are used to control logging configuration on the Dataflow worker.")
+@Deprecated
public interface DataflowWorkerLoggingOptions extends PipelineOptions {
/**
* The set of log levels that can be used on the Dataflow worker.
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
index 3375dc7..39debb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
@@ -33,6 +33,8 @@ public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar
.add(PipelineOptions.class)
.add(ApplicationNameOptions.class)
.add(StreamingOptions.class)
+ .add(ExperimentalOptions.class)
+ .add(SdkHarnessOptions.class)
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
new file mode 100644
index 0000000..cb5c41c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Apache Beam provides a number of experimental features that can
+ * be enabled with this flag. If executing against a managed service, please contact the
+ * service owners before enabling any experiments.
+ */
+@Experimental
+@Hidden
+public interface ExperimentalOptions extends PipelineOptions {
+ @Description("[Experimental] Apache Beam provides a number of experimental features that can "
+ + "be enabled with this flag. If executing against a managed service, please contact the "
+ + "service owners before enabling any experiments.")
+ @Nullable
+ List<String> getExperiments();
+ void setExperiments(@Nullable List<String> value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
new file mode 100644
index 0000000..5f5dd6e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Options that are used to control configuration of the SDK harness.
+ */
+@Experimental
+@Description("Options that are used to control configuration of the SDK harness.")
+public interface SdkHarnessOptions extends PipelineOptions {
+ /**
+ * The set of log levels that can be used in the SDK harness.
+ */
+ enum LogLevel {
+ /** Special level used to turn off logging. */
+ OFF,
+
+ /** LogLevel for logging error messages. */
+ ERROR,
+
+ /** LogLevel for logging warning messages. */
+ WARN,
+
+ /** LogLevel for logging informational messages. */
+ INFO,
+
+ /** LogLevel for logging diagnostic messages. */
+ DEBUG,
+
+ /** LogLevel for logging tracing messages. */
+ TRACE
+ }
+
+ /**
+ * This option controls the default log level of all loggers without a log level override.
+ */
+ @Description("Controls the default log level of all loggers without a log level override.")
+ @Default.Enum("INFO")
+ LogLevel getDefaultSdkHarnessLogLevel();
+ void setDefaultSdkHarnessLogLevel(LogLevel logLevel);
+
+ /**
+ * This option controls the log levels for specifically named loggers.
+ *
+ * <p>Later options with equivalent names override earlier options.
+ *
+ * <p>See {@link SdkHarnessLogLevelOverrides} for more information on how to configure logging
+ * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
+ * the expected format is {"Name":"LogLevel",...}, further details on
+ * {@link SdkHarnessLogLevelOverrides#from}.
+ */
+ @Description("This option controls the log levels for specifically named loggers. "
+ + "The expected format is {\"Name\":\"LogLevel\",...}. The SDK harness supports a logging "
+ + "hierarchy based off of names that are '.' separated. For example, by specifying the value "
+ + "{\"a.b.c.Foo\":\"DEBUG\"}, the logger for the class 'a.b.c.Foo' will be configured to "
+ + "output logs at the DEBUG level. Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, "
+ + "all loggers underneath the 'a.b.c' package will be configured to output logs at the WARN "
+ + "level. System.out and System.err levels are configured via loggers of the corresponding "
+ + "name. Also, note that when multiple overrides are specified, the exact name followed by "
+ + "the closest parent takes precedence.")
+ SdkHarnessLogLevelOverrides getSdkHarnessLogLevelOverrides();
+ void setSdkHarnessLogLevelOverrides(SdkHarnessLogLevelOverrides value);
+
+ /**
+ * Defines a log level override for a specific class, package, or name.
+ *
+ * <p>The SDK harness supports a logging hierarchy based off of names that are "."
+ * separated. It is a common pattern to have the logger for a given class share the same name as
+ * the class itself. Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar},
+ * with loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"}
+ * respectively, we can override the log levels:
+ * <ul>
+ * <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
+ * representing {@code a.b.c.Foo}.
+ * <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
+ * the {@link Package} representing {@code a.b}.
+ * <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
+ * </ul>
+ *
+ * <p>{@code System.out} and {@code System.err} messages are configured via loggers of the
+ * corresponding name. Note that by specifying multiple overrides, the exact name followed by the
+ * closest parent takes precedence.
+ */
+ class SdkHarnessLogLevelOverrides extends HashMap<String, LogLevel> {
+ /**
+ * Overrides the default log level for the passed in class.
+ *
+ * <p>This is equivalent to calling
+ * {@link #addOverrideForName(String, LogLevel)}
+ * and passing in the {@link Class#getName() class name}.
+ */
+ public SdkHarnessLogLevelOverrides addOverrideForClass(Class<?> klass, LogLevel logLevel) {
+ checkNotNull(klass, "Expected class to be not null.");
+ addOverrideForName(klass.getName(), logLevel);
+ return this;
+ }
+
+ /**
+ * Overrides the default log level for the passed in package.
+ *
+ * <p>This is equivalent to calling
+ * {@link #addOverrideForName(String, LogLevel)}
+ * and passing in the {@link Package#getName() package name}.
+ */
+ public SdkHarnessLogLevelOverrides addOverrideForPackage(Package pkg, LogLevel logLevel) {
+ checkNotNull(pkg, "Expected package to be not null.");
+ addOverrideForName(pkg.getName(), logLevel);
+ return this;
+ }
+
+ /**
+ * Overrides the default log logLevel for the passed in name.
+ *
+ * <p>Note that because of the hierarchical nature of logger names, this will
+ * override the log logLevel of all loggers that have the passed in name or
+ * a parent logger that has the passed in name.
+ */
+ public SdkHarnessLogLevelOverrides addOverrideForName(String name, LogLevel logLevel) {
+ checkNotNull(name, "Expected name to be not null.");
+ checkNotNull(logLevel,
+ "Expected logLevel to be one of %s.", Arrays.toString(LogLevel.values()));
+ put(name, logLevel);
+ return this;
+ }
+
+ /**
+ * Expects a map keyed by logger {@code Name}s with values representing {@code LogLevel}s.
+ * The {@code Name} generally represents the fully qualified Java
+ * {@link Class#getName() class name}, or fully qualified Java
+ * {@link Package#getName() package name}, or custom logger name. The {@code LogLevel}
+ * represents the log level and must be one of {@link LogLevel}.
+ */
+ @JsonCreator
+ public static SdkHarnessLogLevelOverrides from(Map<String, String> values) {
+ checkNotNull(values, "Expected values to be not null.");
+ SdkHarnessLogLevelOverrides overrides = new SdkHarnessLogLevelOverrides();
+ for (Map.Entry<String, String> entry : values.entrySet()) {
+ try {
+ overrides.addOverrideForName(entry.getKey(), LogLevel.valueOf(entry.getValue()));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Unsupported log level '%s' requested for %s. Must be one of %s.",
+ entry.getValue(), entry.getKey(), Arrays.toString(LogLevel.values())));
+ }
+
+ }
+ return overrides;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
new file mode 100644
index 0000000..565bbac
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel.WARN;
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.options.SdkHarnessOptions.SdkHarnessLogLevelOverrides;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SdkHarnessOptions}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessOptionsTest {
+ private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+ ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testSdkHarnessLogLevelOverrideWithInvalidLogLevel() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Unsupported log level");
+ SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+ }
+
+ @Test
+ public void testSdkHarnessLogLevelOverrideForClass() throws Exception {
+ assertEquals("{\"org.junit.Test\":\"WARN\"}",
+ MAPPER.writeValueAsString(
+ new SdkHarnessLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
+ }
+
+ @Test
+ public void testSdkHarnessLogLevelOverrideForPackage() throws Exception {
+ assertEquals("{\"org.junit\":\"WARN\"}",
+ MAPPER.writeValueAsString(
+ new SdkHarnessLogLevelOverrides().addOverrideForPackage(
+ Test.class.getPackage(), WARN)));
+ }
+
+ @Test
+ public void testSdkHarnessLogLevelOverrideForName() throws Exception {
+ assertEquals("{\"A\":\"WARN\"}",
+ MAPPER.writeValueAsString(
+ new SdkHarnessLogLevelOverrides().addOverrideForName("A", WARN)));
+ }
+
+ @Test
+ public void testSerializationAndDeserializationOf() throws Exception {
+ String testValue = "{\"A\":\"WARN\"}";
+ assertEquals(testValue,
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(testValue, SdkHarnessLogLevelOverrides.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index d8d157f..6343b3e 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -173,12 +173,6 @@
</dependency>
<dependency>
- <!-- TODO: BEAM-2566 remove this dependency -->
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
index 62e8b44..0c615a9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -28,7 +28,7 @@ import io.netty.channel.unix.DomainSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
*/
public abstract class ManagedChannelFactory {
public static ManagedChannelFactory from(PipelineOptions options) {
- List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
io.netty.channel.epoll.Epoll.ensureAvailability();
return new Epoll();
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index eedac4a..97396e7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -25,8 +25,8 @@ import java.util.List;
import java.util.function.Consumer;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -81,7 +81,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
* returns the default buffer limit.
*/
private static int getBufferLimit(PipelineOptions options) {
- List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index b19277a..e7e0c71 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -52,9 +52,9 @@ import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
/**
* Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API.
@@ -70,14 +70,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
.put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
.build();
- private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION =
- ImmutableMap.<DataflowWorkerLoggingOptions.Level, Level>builder()
- .put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF)
- .put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE)
- .put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING)
- .put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO)
- .put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE)
- .put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST)
+ private static final ImmutableMap<SdkHarnessOptions.LogLevel, Level> LEVEL_CONFIGURATION =
+ ImmutableMap.<SdkHarnessOptions.LogLevel, Level>builder()
+ .put(SdkHarnessOptions.LogLevel.OFF, Level.OFF)
+ .put(SdkHarnessOptions.LogLevel.ERROR, Level.SEVERE)
+ .put(SdkHarnessOptions.LogLevel.WARN, Level.WARNING)
+ .put(SdkHarnessOptions.LogLevel.INFO, Level.INFO)
+ .put(SdkHarnessOptions.LogLevel.DEBUG, Level.FINE)
+ .put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST)
.build();
private static final Formatter FORMATTER = new SimpleFormatter();
@@ -119,14 +119,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
}
// Use the passed in logging options to configure the various logger levels.
- DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
- if (loggingOptions.getDefaultWorkerLogLevel() != null) {
- rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultWorkerLogLevel()));
+ SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
+ if (loggingOptions.getDefaultSdkHarnessLogLevel() != null) {
+ rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultSdkHarnessLogLevel()));
}
- if (loggingOptions.getWorkerLogLevelOverrides() != null) {
- for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
- loggingOptions.getWorkerLogLevelOverrides().entrySet()) {
+ if (loggingOptions.getSdkHarnessLogLevelOverrides() != null) {
+ for (Map.Entry<String, SdkHarnessOptions.LogLevel> loggerOverride :
+ loggingOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
Logger logger = Logger.getLogger(loggerOverride.getKey());
logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue()));
configuredLoggers.add(logger);
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
index 063d5af..99e33c2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
@@ -23,8 +23,8 @@ import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
*/
public abstract class StreamObserverFactory {
public static StreamObserverFactory fromOptions(PipelineOptions options) {
- List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
if (experiments != null && experiments.contains("beam_fn_api_buffered_stream")) {
int bufferSize = Buffered.DEFAULT_BUFFER_SIZE;
for (String experiment : experiments) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 015e5ec..c9057ea 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -134,8 +134,8 @@ public class BeamFnLoggingClientTest {
BeamFnLoggingClient client = new BeamFnLoggingClient(
PipelineOptionsFactory.fromArgs(new String[] {
- "--defaultWorkerLogLevel=OFF",
- "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
}).create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel);
@@ -197,8 +197,8 @@ public class BeamFnLoggingClientTest {
try {
BeamFnLoggingClient client = new BeamFnLoggingClient(
PipelineOptionsFactory.fromArgs(new String[] {
- "--defaultWorkerLogLevel=OFF",
- "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
}).create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel);
@@ -249,8 +249,8 @@ public class BeamFnLoggingClientTest {
try {
BeamFnLoggingClient client = new BeamFnLoggingClient(
PipelineOptionsFactory.fromArgs(new String[] {
- "--defaultWorkerLogLevel=OFF",
- "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
}).create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel);