You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/10/26 18:40:45 UTC

[2/3] beam git commit: [BEAM-2566] Decouple SDK harness from Dataflow runner by elevating experiments and SDK harness configuration to java-sdk-core.

[BEAM-2566] Decouple SDK harness from Dataflow runner by elevating experiments and SDK harness configuration to java-sdk-core.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5290f1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5290f1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5290f1a

Branch: refs/heads/master
Commit: e5290f1ad82192e85d4e7a838d94ee771d9e4f7b
Parents: cde3b7b
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 19 11:26:04 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 26 11:39:53 2017 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   |  23 +--
 .../options/DataflowWorkerLoggingOptions.java   |   6 +
 .../DefaultPipelineOptionsRegistrar.java        |   2 +
 .../beam/sdk/options/ExperimentalOptions.java   |  38 ++++
 .../beam/sdk/options/SdkHarnessOptions.java     | 173 +++++++++++++++++++
 .../beam/sdk/options/SdkHarnessOptionsTest.java |  76 ++++++++
 sdks/java/harness/pom.xml                       |   6 -
 .../harness/channel/ManagedChannelFactory.java  |   4 +-
 .../BeamFnDataBufferingOutboundObserver.java    |   4 +-
 .../fn/harness/logging/BeamFnLoggingClient.java |  30 ++--
 .../harness/stream/StreamObserverFactory.java   |   4 +-
 .../logging/BeamFnLoggingClientTest.java        |  12 +-
 12 files changed, 324 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index d0ea722..ec108da 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -19,16 +19,14 @@ package org.apache.beam.runners.dataflow.options;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.api.services.dataflow.Dataflow;
-import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.GcsStager;
 import org.apache.beam.runners.dataflow.util.Stager;
-import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
@@ -40,24 +38,7 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 @Description("[Internal] Options used to control execution of the Dataflow SDK for "
     + "debugging and testing purposes.")
 @Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
-  /**
-   * The list of backend experiments to enable.
-   *
-   * <p>Dataflow provides a number of experimental features that can be enabled
-   * with this flag.
-   *
-   * <p>Please sync with the Dataflow team before enabling any experiments.
-   */
-  @Description("[Experimental] Dataflow provides a number of experimental features that can "
-      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
-      + "experiments.")
-  @Experimental
-  @Nullable
-  List<String> getExperiments();
-  void setExperiments(@Nullable List<String> value);
-
+public interface DataflowPipelineDebugOptions extends ExperimentalOptions, PipelineOptions {
   /**
    * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
    * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
index fae851c..a419b76 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -29,8 +29,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * Options that are used to control logging configuration on the Dataflow worker.
+ *
+ * @deprecated This interface will no longer be the source of truth for worker logging configuration
+ * once jobs are executed using a dedicated SDK harness instead of user code being co-located
+ * alongside Dataflow worker code. Please set the option below and also the corresponding option
+ * within {@link org.apache.beam.sdk.options.SdkHarnessOptions} to ensure forward compatibility.
  */
 @Description("Options that are used to control logging configuration on the Dataflow worker.")
+@Deprecated
 public interface DataflowWorkerLoggingOptions extends PipelineOptions {
   /**
    * The set of log levels that can be used on the Dataflow worker.

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
index 3375dc7..39debb5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java
@@ -33,6 +33,8 @@ public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar
         .add(PipelineOptions.class)
         .add(ApplicationNameOptions.class)
         .add(StreamingOptions.class)
+        .add(ExperimentalOptions.class)
+        .add(SdkHarnessOptions.class)
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
new file mode 100644
index 0000000..cb5c41c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Apache Beam provides a number of experimental features that can
+ * be enabled with this flag. If executing against a managed service, please contact the
+ * service owners before enabling any experiments.
+ */
+@Experimental
+@Hidden
+public interface ExperimentalOptions extends PipelineOptions {
+  @Description("[Experimental] Apache Beam provides a number of experimental features that can "
+      + "be enabled with this flag. If executing against a managed service, please contact the "
+      + "service owners before enabling any experiments.")
+  @Nullable
+  List<String> getExperiments();
+  void setExperiments(@Nullable List<String> value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
new file mode 100644
index 0000000..5f5dd6e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Options that are used to control configuration of the SDK harness.
+ */
+@Experimental
+@Description("Options that are used to control configuration of the SDK harness.")
+public interface SdkHarnessOptions extends PipelineOptions {
+  /**
+   * The set of log levels that can be used in the SDK harness.
+   */
+  enum LogLevel {
+    /** Special level used to turn off logging. */
+    OFF,
+
+    /** LogLevel for logging error messages. */
+    ERROR,
+
+    /** LogLevel for logging warning messages. */
+    WARN,
+
+    /** LogLevel for logging informational messages. */
+    INFO,
+
+    /** LogLevel for logging diagnostic messages. */
+    DEBUG,
+
+    /** LogLevel for logging tracing messages. */
+    TRACE
+  }
+
+  /**
+   * This option controls the default log level of all loggers without a log level override.
+   */
+  @Description("Controls the default log level of all loggers without a log level override.")
+  @Default.Enum("INFO")
+  LogLevel getDefaultSdkHarnessLogLevel();
+  void setDefaultSdkHarnessLogLevel(LogLevel logLevel);
+
+  /**
+   * This option controls the log levels for specifically named loggers.
+   *
+   * <p>Later options with equivalent names override earlier options.
+   *
+   * <p>See {@link SdkHarnessLogLevelOverrides} for more information on how to configure logging
+   * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
+   * the expected format is {"Name":"LogLevel",...}, further details on
+   * {@link SdkHarnessLogLevelOverrides#from}.
+   */
+  @Description("This option controls the log levels for specifically named loggers. "
+      + "The expected format is {\"Name\":\"LogLevel\",...}. The SDK harness supports a logging "
+      + "hierarchy based off of names that are '.' separated. For example, by specifying the value "
+      + "{\"a.b.c.Foo\":\"DEBUG\"}, the logger for the class 'a.b.c.Foo' will be configured to "
+      + "output logs at the DEBUG level. Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, "
+      + "all loggers underneath the 'a.b.c' package will be configured to output logs at the WARN "
+      + "level. System.out and System.err levels are configured via loggers of the corresponding "
+      + "name. Also, note that when multiple overrides are specified, the exact name followed by "
+      + "the closest parent takes precedence.")
+  SdkHarnessLogLevelOverrides getSdkHarnessLogLevelOverrides();
+  void setSdkHarnessLogLevelOverrides(SdkHarnessLogLevelOverrides value);
+
+  /**
+   * Defines a log level override for a specific class, package, or name.
+   *
+   * <p>The SDK harness supports a logging hierarchy based off of names that are "."
+   * separated. It is a common pattern to have the logger for a given class share the same name as
+   * the class itself. Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar},
+   * with loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"}
+   * respectively, we can override the log levels:
+   * <ul>
+   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
+   *    representing {@code a.b.c.Foo}.
+   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
+   *    the {@link Package} representing {@code a.b}.
+   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
+   * </ul>
+   *
+   * <p>{@code System.out} and {@code System.err} messages are configured via loggers of the
+   * corresponding name. Note that by specifying multiple overrides, the exact name followed by the
+   * closest parent takes precedence.
+   */
+  class SdkHarnessLogLevelOverrides extends HashMap<String, LogLevel> {
+    /**
+     * Overrides the default log level for the passed in class.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, LogLevel)}
+     * and passing in the {@link Class#getName() class name}.
+     */
+    public SdkHarnessLogLevelOverrides addOverrideForClass(Class<?> klass, LogLevel logLevel) {
+      checkNotNull(klass, "Expected class to be not null.");
+      addOverrideForName(klass.getName(), logLevel);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in package.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, LogLevel)}
+     * and passing in the {@link Package#getName() package name}.
+     */
+    public SdkHarnessLogLevelOverrides addOverrideForPackage(Package pkg, LogLevel logLevel) {
+      checkNotNull(pkg, "Expected package to be not null.");
+      addOverrideForName(pkg.getName(), logLevel);
+      return this;
+    }
+
+    /**
+     * Overrides the default log logLevel for the passed in name.
+     *
+     * <p>Note that because of the hierarchical nature of logger names, this will
+     * override the log logLevel of all loggers that have the passed in name or
+     * a parent logger that has the passed in name.
+     */
+    public SdkHarnessLogLevelOverrides addOverrideForName(String name, LogLevel logLevel) {
+      checkNotNull(name, "Expected name to be not null.");
+      checkNotNull(logLevel,
+          "Expected logLevel to be one of %s.", Arrays.toString(LogLevel.values()));
+      put(name, logLevel);
+      return this;
+    }
+
+    /**
+     * Expects a map keyed by logger {@code Name}s with values representing {@code LogLevel}s.
+     * The {@code Name} generally represents the fully qualified Java
+     * {@link Class#getName() class name}, or fully qualified Java
+     * {@link Package#getName() package name}, or custom logger name. The {@code LogLevel}
+     * represents the log level and must be one of {@link LogLevel}.
+     */
+    @JsonCreator
+    public static SdkHarnessLogLevelOverrides from(Map<String, String> values) {
+      checkNotNull(values, "Expected values to be not null.");
+      SdkHarnessLogLevelOverrides overrides = new SdkHarnessLogLevelOverrides();
+      for (Map.Entry<String, String> entry : values.entrySet()) {
+        try {
+          overrides.addOverrideForName(entry.getKey(), LogLevel.valueOf(entry.getValue()));
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(String.format(
+              "Unsupported log level '%s' requested for %s. Must be one of %s.",
+              entry.getValue(), entry.getKey(), Arrays.toString(LogLevel.values())));
+        }
+
+      }
+      return overrides;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
new file mode 100644
index 0000000..565bbac
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/SdkHarnessOptionsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel.WARN;
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.options.SdkHarnessOptions.SdkHarnessLogLevelOverrides;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SdkHarnessOptions}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessOptionsTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testSdkHarnessLogLevelOverrideWithInvalidLogLevel() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Unsupported log level");
+    SdkHarnessLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+  }
+
+  @Test
+  public void testSdkHarnessLogLevelOverrideForClass() throws Exception {
+    assertEquals("{\"org.junit.Test\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new SdkHarnessLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
+  }
+
+  @Test
+  public void testSdkHarnessLogLevelOverrideForPackage() throws Exception {
+    assertEquals("{\"org.junit\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new SdkHarnessLogLevelOverrides().addOverrideForPackage(
+                Test.class.getPackage(), WARN)));
+  }
+
+  @Test
+  public void testSdkHarnessLogLevelOverrideForName() throws Exception {
+    assertEquals("{\"A\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new SdkHarnessLogLevelOverrides().addOverrideForName("A", WARN)));
+  }
+
+  @Test
+  public void testSerializationAndDeserializationOf() throws Exception {
+    String testValue = "{\"A\":\"WARN\"}";
+    assertEquals(testValue,
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(testValue, SdkHarnessLogLevelOverrides.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index d8d157f..6343b3e 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -173,12 +173,6 @@
     </dependency>
 
     <dependency>
-      <!-- TODO: BEAM-2566 remove this dependency -->
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
index 62e8b44..0c615a9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -28,7 +28,7 @@ import io.netty.channel.unix.DomainSocketAddress;
 import java.net.SocketAddress;
 import java.util.List;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public abstract class ManagedChannelFactory {
   public static ManagedChannelFactory from(PipelineOptions options) {
-    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
     if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
       io.netty.channel.epoll.Epoll.ensureAvailability();
       return new Epoll();

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index eedac4a..97396e7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -25,8 +25,8 @@ import java.util.List;
 import java.util.function.Consumer;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -81,7 +81,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
    * returns the default buffer limit.
    */
   private static int getBufferLimit(PipelineOptions options) {
-    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
     for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
       if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
         return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index b19277a..e7e0c71 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -52,9 +52,9 @@ import java.util.logging.SimpleFormatter;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
 
 /**
  * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API.
@@ -70,14 +70,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
       .put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
       .build();
 
-  private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION =
-      ImmutableMap.<DataflowWorkerLoggingOptions.Level, Level>builder()
-          .put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF)
-          .put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE)
-          .put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING)
-          .put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO)
-          .put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE)
-          .put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST)
+  private static final ImmutableMap<SdkHarnessOptions.LogLevel, Level> LEVEL_CONFIGURATION =
+      ImmutableMap.<SdkHarnessOptions.LogLevel, Level>builder()
+          .put(SdkHarnessOptions.LogLevel.OFF, Level.OFF)
+          .put(SdkHarnessOptions.LogLevel.ERROR, Level.SEVERE)
+          .put(SdkHarnessOptions.LogLevel.WARN, Level.WARNING)
+          .put(SdkHarnessOptions.LogLevel.INFO, Level.INFO)
+          .put(SdkHarnessOptions.LogLevel.DEBUG, Level.FINE)
+          .put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST)
           .build();
 
   private static final Formatter FORMATTER = new SimpleFormatter();
@@ -119,14 +119,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
     }
 
     // Use the passed in logging options to configure the various logger levels.
-    DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
-    if (loggingOptions.getDefaultWorkerLogLevel() != null) {
-      rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultWorkerLogLevel()));
+    SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
+    if (loggingOptions.getDefaultSdkHarnessLogLevel() != null) {
+      rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultSdkHarnessLogLevel()));
     }
 
-    if (loggingOptions.getWorkerLogLevelOverrides() != null) {
-      for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
-        loggingOptions.getWorkerLogLevelOverrides().entrySet()) {
+    if (loggingOptions.getSdkHarnessLogLevelOverrides() != null) {
+      for (Map.Entry<String, SdkHarnessOptions.LogLevel> loggerOverride :
+        loggingOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
         Logger logger = Logger.getLogger(loggerOverride.getKey());
         logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue()));
         configuredLoggers.add(logger);

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
index 063d5af..99e33c2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
@@ -23,8 +23,8 @@ import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public abstract class StreamObserverFactory {
   public static StreamObserverFactory fromOptions(PipelineOptions options) {
-    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
     if (experiments != null && experiments.contains("beam_fn_api_buffered_stream")) {
       int bufferSize = Buffered.DEFAULT_BUFFER_SIZE;
       for (String experiment : experiments) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e5290f1a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 015e5ec..c9057ea 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -134,8 +134,8 @@ public class BeamFnLoggingClientTest {
 
       BeamFnLoggingClient client = new BeamFnLoggingClient(
           PipelineOptionsFactory.fromArgs(new String[] {
-              "--defaultWorkerLogLevel=OFF",
-              "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+              "--defaultSdkHarnessLogLevel=OFF",
+              "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
           }).create(),
           apiServiceDescriptor,
           (Endpoints.ApiServiceDescriptor descriptor) -> channel);
@@ -197,8 +197,8 @@ public class BeamFnLoggingClientTest {
     try {
       BeamFnLoggingClient client = new BeamFnLoggingClient(
           PipelineOptionsFactory.fromArgs(new String[] {
-              "--defaultWorkerLogLevel=OFF",
-              "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+              "--defaultSdkHarnessLogLevel=OFF",
+              "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
           }).create(),
           apiServiceDescriptor,
           (Endpoints.ApiServiceDescriptor descriptor) -> channel);
@@ -249,8 +249,8 @@ public class BeamFnLoggingClientTest {
     try {
       BeamFnLoggingClient client = new BeamFnLoggingClient(
           PipelineOptionsFactory.fromArgs(new String[] {
-              "--defaultWorkerLogLevel=OFF",
-              "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+              "--defaultSdkHarnessLogLevel=OFF",
+              "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
           }).create(),
           apiServiceDescriptor,
           (Endpoints.ApiServiceDescriptor descriptor) -> channel);