You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 19:06:43 UTC

[1/5] beam git commit: Move stable name validation to Pipeline.run()

Repository: beam
Updated Branches:
  refs/heads/master be883666f -> f5e3f5230


Move stable name validation to Pipeline.run()

The action taken when a pipeline does not have unique stable
names depends on the PipelineOptions, which will not available
during construction. Moving this later removes one blocker
from the refactor to PipelineOptions availability.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6ebb045
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6ebb045
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6ebb045

Branch: refs/heads/master
Commit: f6ebb04513497c5a835e6b03cad3cefe20c682ed
Parents: f29444b
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 12:49:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 10:41:32 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/Pipeline.java | 49 ++++++++++++--------
 .../java/org/apache/beam/sdk/PipelineTest.java  | 11 +++--
 2 files changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f6ebb045/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 88ecc0b..716b328 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -19,8 +19,11 @@ package org.apache.beam.sdk;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -274,6 +277,7 @@ public class Pipeline {
     // pipeline.
     LOG.debug("Running {} via {}", this, runner);
     try {
+      validate(options);
       return runner.run(this);
     } catch (UserCodeException e) {
       // This serves to replace the stack with one that ends here and
@@ -418,6 +422,7 @@ public class Pipeline {
   private final TransformHierarchy transforms = new TransformHierarchy(this);
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
+  private final List<String> unstableNames = new ArrayList<>();
 
   protected Pipeline(PipelineRunner<?> runner, PipelineOptions options) {
     this.runner = runner;
@@ -442,25 +447,7 @@ public class Pipeline {
     boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name));
 
     if (!nameIsUnique) {
-      switch (getOptions().getStableUniqueNames()) {
-        case OFF:
-          break;
-        case WARNING:
-          LOG.warn(
-              "Transform {} does not have a stable unique name. "
-                  + "This will prevent updating of pipelines.",
-              uniqueName);
-          break;
-        case ERROR:
-          throw new IllegalStateException(
-              "Transform "
-                  + uniqueName
-                  + " does not have a stable unique name. "
-                  + "This will prevent updating of pipelines.");
-        default:
-          throw new IllegalArgumentException(
-              "Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
-      }
+      unstableNames.add(uniqueName);
     }
 
     LOG.debug("Adding {} to {}", transform, this);
@@ -504,6 +491,30 @@ public class Pipeline {
     }
   }
 
+  @VisibleForTesting
+  void validate(PipelineOptions options) {
+    if (!unstableNames.isEmpty()) {
+      switch (options.getStableUniqueNames()) {
+        case OFF:
+          break;
+        case WARNING:
+          LOG.warn(
+              "The following transforms do not have stable unique names: {}",
+              Joiner.on(", ").join(unstableNames));
+          break;
+        case ERROR:
+          throw new IllegalStateException(
+              String.format(
+                  "Pipeline update will not be possible"
+                      + " because the following transforms do not have stable unique names: %s.",
+                  Joiner.on(", ").join(unstableNames)));
+        default:
+          throw new IllegalArgumentException(
+              "Unrecognized value for stable unique names: " + options.getStableUniqueNames());
+      }
+    }
+  }
+
   /**
    * Returns the {@link PipelineOptions} provided at the time this {@link Pipeline} was created.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/f6ebb045/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index af629a6..5ddfc57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -189,8 +189,8 @@ public class PipelineTest {
 
     pipeline.apply(Create.of(5, 6, 7));
     pipeline.apply(Create.of(5, 6, 7));
-
-    logged.verifyNotLogged("does not have a stable unique name.");
+    ((Pipeline) pipeline).validate(pipeline.getOptions());
+    logged.verifyNotLogged("do not have stable unique names");
   }
 
   @Test
@@ -201,8 +201,8 @@ public class PipelineTest {
 
     pipeline.apply(Create.of(5, 6, 7));
     pipeline.apply(Create.of(5, 6, 7));
-
-    logged.verifyWarn("does not have a stable unique name.");
+    ((Pipeline) pipeline).validate(pipeline.getOptions());
+    logged.verifyWarn("do not have stable unique names");
   }
 
   @Test
@@ -211,8 +211,9 @@ public class PipelineTest {
 
     pipeline.apply(Create.of(5, 6, 7));
 
-    thrown.expectMessage("does not have a stable unique name.");
+    thrown.expectMessage("do not have stable unique names");
     pipeline.apply(Create.of(5, 6, 7));
+    ((Pipeline) pipeline).validate(pipeline.getOptions());
   }
 
   /**


[3/5] beam git commit: Move PTransform.validate to post-construction, modulo BigQueryIO

Posted by ke...@apache.org.
Move PTransform.validate to post-construction, modulo BigQueryIO

PipelineOptions, as used for most validation, should not be available
at construction time. Instead, validation will be called just before
running a pipeline.

BigQueryIO is particularly problematic and will get further treatment.
For now, the workaround is to establish the proper validation methods
but then to call them (erroneously, basically) at construction time
in expand().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19aa8ba5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19aa8ba5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19aa8ba5

Branch: refs/heads/master
Commit: 19aa8ba576c2d43166dc6d67a4c5c103b3522870
Parents: 4291fa6
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 14:37:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 11:39:00 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ForwardingPTransform.java |  5 ++-
 .../construction/ForwardingPTransformTest.java  |  7 ++--
 .../beam/runners/dataflow/AssignWindows.java    |  5 ++-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 22 ++++++++++-
 .../apache/beam/sdk/coders/CoderRegistry.java   |  9 ++++-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  7 +++-
 .../apache/beam/sdk/transforms/GroupByKey.java  | 27 ++++++-------
 .../apache/beam/sdk/transforms/PTransform.java  |  8 ++--
 .../org/apache/beam/sdk/transforms/View.java    | 25 +++---------
 .../beam/sdk/transforms/windowing/Window.java   |  5 ++-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |  4 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 27 +++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 41 ++++++--------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 +++---
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  4 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  6 +--
 .../hadoop/inputformat/HadoopInputFormatIO.java | 10 ++---
 .../inputformat/HadoopInputFormatIOTest.java    | 27 +++++++------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |  4 +-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  1 +
 .../java/org/apache/beam/sdk/io/hdfs/Write.java |  7 +++-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |  5 ++-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 37 ++++++------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +-
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |  4 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  5 ++-
 27 files changed, 168 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index 3bee281..2f427ad 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PInput;
@@ -40,8 +41,8 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
   }
 
   @Override
-  public void validate(InputT input) {
-    delegate().validate(input);
+  public void validate(PipelineOptions options) {
+    delegate().validate(options);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 7d3bfd8..74c056c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -79,11 +80,11 @@ public class ForwardingPTransformTest {
   @Test
   public void validateDelegates() {
     @SuppressWarnings("unchecked")
-    PCollection<Integer> input = Mockito.mock(PCollection.class);
-    Mockito.doThrow(RuntimeException.class).when(delegate).validate(input);
+    PipelineOptions options = Mockito.mock(PipelineOptions.class);
+    Mockito.doThrow(RuntimeException.class).when(delegate).validate(options);
 
     thrown.expect(RuntimeException.class);
-    forwarding.validate(input);
+    forwarding.validate(options);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 3e36899..ffd56c9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -73,8 +74,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
   }
 
   @Override
-  public void validate(PCollection<T> input) {
-    transform.validate(input);
+  public void validate(PipelineOptions options) {
+    transform.validate(options);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 716b328..203bd14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -454,7 +454,6 @@ public class Pipeline {
     transforms.pushNode(uniqueName, input, transform);
     try {
       transforms.finishSpecifyingInput();
-      transform.validate(input);
       OutputT output = transform.expand(input);
       transforms.setOutput(output);
 
@@ -493,6 +492,7 @@ public class Pipeline {
 
   @VisibleForTesting
   void validate(PipelineOptions options) {
+    this.traverseTopologically(new ValidateVisitor(options));
     if (!unstableNames.isEmpty()) {
       switch (options.getStableUniqueNames()) {
         case OFF:
@@ -561,4 +561,24 @@ public class Pipeline {
   private String buildName(String namePrefix, String name) {
     return namePrefix.isEmpty() ? name : namePrefix + "/" + name;
   }
+
+  private static class ValidateVisitor extends PipelineVisitor.Defaults {
+
+    private final PipelineOptions options;
+
+    public ValidateVisitor(PipelineOptions options) {
+      this.options = options;
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(Node node) {
+      node.getTransform().validate(options);
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      node.getTransform().validate(options);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index ab0a3e1..7f0f632 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -255,6 +255,9 @@ public class CoderRegistry implements CoderProvider {
       TypeDescriptor<InputT> inputTypeDescriptor,
       Coder<InputT> inputCoder)
       throws CannotProvideCoderException {
+    checkArgument(typeDescriptor != null);
+    checkArgument(inputTypeDescriptor != null);
+    checkArgument(inputCoder != null);
     return getDefaultCoder(
         typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
   }
@@ -859,6 +862,8 @@ public class CoderRegistry implements CoderProvider {
    * in the given {@link Coder}.
    */
   private Map<Type, Coder<?>> getTypeToCoderBindings(Type type, Coder<?> coder) {
+    checkArgument(type != null);
+    checkArgument(coder != null);
     if (type instanceof TypeVariable || type instanceof Class) {
       return ImmutableMap.<Type, Coder<?>>of(type, coder);
     } else if (type instanceof ParameterizedType) {
@@ -889,7 +894,9 @@ public class CoderRegistry implements CoderProvider {
       for (int i = 0; i < typeArguments.size(); i++) {
         Type typeArgument = typeArguments.get(i);
         Coder<?> coderArgument = coderArguments.get(i);
-        typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument));
+        if (coderArgument != null) {
+          typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument));
+        }
       }
 
       return ImmutableMap.<Type, Coder<?>>builder().putAll(typeToCoder).build();

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index ba41593..dcd600f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -122,14 +122,17 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
         "%s can only be applied to an unbounded PCollection if doing windowed writes",
         WriteFiles.class.getSimpleName());
-    PipelineOptions options = input.getPipeline().getOptions();
-    sink.validate(options);
     this.writeOperation = sink.createWriteOperation();
     this.writeOperation.setWindowedWrites(windowedWrites);
     return createWrite(input);
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
+
+  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index d9c4c9f..6a89c5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -184,21 +184,6 @@ public class GroupByKey<K, V>
     }
   }
 
-  @Override
-  public void validate(PCollection<KV<K, V>> input) {
-    applicableTo(input);
-
-    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
-    // the key coder is deterministic.
-    Coder<K> keyCoder = getKeyCoder(input.getCoder());
-    try {
-      keyCoder.verifyDeterministic();
-    } catch (NonDeterministicException e) {
-      throw new IllegalStateException(
-          "the keyCoder of a GroupByKey must be deterministic", e);
-    }
-  }
-
   public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
     WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
     if (!inputWindowFn.isNonMerging()) {
@@ -216,6 +201,18 @@ public class GroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    applicableTo(input);
+
+    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
+    // the key coder is deterministic.
+    Coder<K> keyCoder = getKeyCoder(input.getCoder());
+    try {
+      keyCoder.verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalStateException(
+          "the keyCoder of a GroupByKey must be deterministic", e);
+    }
+
     // This primitive operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 687938d..4f651f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.NameUtils;
@@ -187,13 +188,12 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
   public abstract OutputT expand(InputT input);
 
   /**
-   * Called before invoking apply (which may be intercepted by the runner) to
-   * verify this transform is fully specified and applicable to the specified
-   * input.
+   * Called before running the Pipeline to verify this transform is fully and correctly
+   * specified.
    *
    * <p>By default, does nothing.
    */
-  public void validate(InputT input) {}
+  public void validate(PipelineOptions options) {}
 
   /**
    * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 14035b0..0495ad6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -246,16 +246,13 @@ public class View {
     private AsList() { }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
           input, input.getWindowingStrategy(), input.getCoder())));
     }
@@ -272,16 +269,13 @@ public class View {
     private AsIterable() { }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
           input, input.getWindowingStrategy(), input.getCoder())));
     }
@@ -329,16 +323,13 @@ public class View {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> singletonCombine =
           Combine.globally(new SingletonCombineFn<>(hasDefault, input.getCoder(), defaultValue));
       if (!hasDefault) {
@@ -415,16 +406,13 @@ public class View {
     private AsMultimap() { }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
           PCollectionViews.multimapView(
               input,
@@ -452,16 +440,13 @@ public class View {
     }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
           PCollectionViews.mapView(
               input,

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index cb7b430..d010d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -324,8 +324,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     return result;
   }
 
-  @Override
-  public void validate(PCollection<T> input) {
+  private void applicableTo(PCollection<?> input) {
     WindowingStrategy<?, ?> outputStrategy =
         getOutputStrategyInternal(input.getWindowingStrategy());
 
@@ -363,6 +362,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
 
   @Override
   public PCollection<T> expand(PCollection<T> input) {
+    applicableTo(input);
+
     WindowingStrategy<?, ?> outputStrategy =
         getOutputStrategyInternal(input.getWindowingStrategy());
     if (getWindowFn() == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 8e138ef..0a3b900 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -399,7 +399,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(
           getConnectionConfiguration() != null,
           "ElasticsearchIO.read() requires a connection configuration"
@@ -725,7 +725,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public void validate(PCollection<String> input) {
+    public void validate(PipelineOptions options) {
       checkState(
           getConnectionConfiguration() != null,
           "ElasticsearchIO.write() requires a connection configuration"

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 38a0f6c..593c580 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -18,8 +18,11 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +34,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -45,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -77,10 +82,32 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    // We will use a BigQuery load job -- validate the temp location.
+    String tempLocation = options.getTempLocation();
+    checkArgument(
+        !Strings.isNullOrEmpty(tempLocation),
+        "BigQueryIO.Write needs a GCS temp location to store temp files.");
+    if (write.getBigQueryServices() == null) {
+      try {
+        GcsPath.fromUri(tempLocation);
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                tempLocation),
+            e);
+      }
+    }
+  }
+
+  @Override
   public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
     Pipeline p = input.getPipeline();
     BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
 
+    validate(p.getOptions());
+
     final String stepUuid = BigQueryHelpers.randomUUIDString();
 
     String tempLocation = options.getTempLocation();

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7ade33f..ea97906 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -388,10 +388,10 @@ public class BigQueryIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       // Even if existence validation is disabled, we need to make sure that the BigQueryIO
       // read is properly specified.
-      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 
       String tempLocation = bqOptions.getTempLocation();
       checkArgument(
@@ -905,8 +905,8 @@ public class BigQueryIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
-      BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    public void validate(PipelineOptions pipelineOptions) {
+      BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
 
       // We must have a destination to write to!
       checkState(getTableFunction() != null,
@@ -939,35 +939,13 @@ public class BigQueryIO {
           BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
         }
       }
-
-      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
-        // We will use BigQuery's streaming write API -- validate supported dispositions.
-        checkArgument(
-            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
-            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
-            + " PCollection.");
-      } else {
-        // We will use a BigQuery load job -- validate the temp location.
-        String tempLocation = options.getTempLocation();
-        checkArgument(
-            !Strings.isNullOrEmpty(tempLocation),
-            "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (getBigQueryServices() == null) {
-          try {
-            GcsPath.fromUri(tempLocation);
-          } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException(
-                String.format(
-                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                    tempLocation),
-                e);
-          }
-        }
-      }
     }
 
     @Override
     public WriteResult expand(PCollection<T> input) {
+
+      validate(input.getPipeline().getOptions());
+
       PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
           input.apply("PrepareWrite", new PrepareWrite<T>(
               getTableFunction(), getFormatFunction()))
@@ -977,6 +955,11 @@ public class BigQueryIO {
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamingInserts and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED) {
+        checkArgument(
+            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
+            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
+                + " PCollection.");
+
         return rowsWithDestination.apply(new StreamingInserts(this));
       } else {
         return rowsWithDestination.apply(new BatchLoads(this));

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ca7460..69fac68 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -286,12 +286,12 @@ public class BigtableIO {
     }
 
     @Override
-    public void validate(PBegin input) {
-      checkArgument(options != null, "BigtableOptions not specified");
+    public void validate(PipelineOptions options) {
+      checkArgument(this.options != null, "BigtableOptions not specified");
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            getBigtableService(options).tableExists(tableId),
             "Table %s does not exist",
             tableId);
       } catch (IOException e) {
@@ -492,12 +492,12 @@ public class BigtableIO {
     }
 
     @Override
-    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      checkArgument(options != null, "BigtableOptions not specified");
+    public void validate(PipelineOptions options) {
+      checkArgument(this.options != null, "BigtableOptions not specified");
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            getBigtableService(options).tableExists(tableId),
             "Table %s does not exist",
             tableId);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index aa0019c..f619429 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -591,7 +591,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getProjectId(), "projectId");
 
       if (getProjectId().isAccessible() && getProjectId().get() == null) {
@@ -1068,7 +1068,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(projectId, "projectId ValueProvider");
       if (projectId.isAccessible()) {
         checkNotNull(projectId.get(), "projectId");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 9e93887..0b94ded 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -270,8 +270,6 @@ public class BigtableIOTest {
   /** Tests that when reading from a non-existent table, the read fails. */
   @Test
   public void testReadingFailsTableDoesNotExist() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
     final String table = "TEST-TABLE";
 
     BigtableIO.Read read =
@@ -285,6 +283,7 @@ public class BigtableIOTest {
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
     p.apply(read);
+    p.run();
   }
 
   /** Tests that when reading from an empty table, the read succeeds. */
@@ -615,8 +614,6 @@ public class BigtableIOTest {
   /** Tests that when writing to a non-existent table, the write fails. */
   @Test
   public void testWritingFailsTableDoesNotExist() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
     final String table = "TEST-TABLE";
 
     PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
@@ -629,6 +626,7 @@ public class BigtableIOTest {
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
     emptyInput.apply("write", defaultWrite.withTableId(table));
+    p.run();
   }
 
   /** Tests that when writing an element fails, the write fails. */

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 93ff108..336740c 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -23,7 +23,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -39,9 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -288,6 +285,7 @@ public class HadoopInputFormatIO {
 
     @Override
     public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
       // Get the key and value coders based on the key and value classes.
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry);
@@ -315,10 +313,10 @@ public class HadoopInputFormatIO {
     }
 
     /**
-     * Validates inputs provided by the pipeline user before reading the data.
+     * Validates construction of this transform.
      */
-    @Override
-    public void validate(PBegin input) {
+    @VisibleForTesting
+    void validateTransform() {
       checkNotNull(getConfiguration(), "getConfiguration()");
       // Validate that the key translation input type must be same as key class of InputFormat.
       validateTranslationFunction(getinputFormatKeyClass(), getKeyTranslationFunction(),

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index ccde03f..aeeeb17 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertThat;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -272,15 +271,15 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when Read transform is created without calling
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when Read transform is created without calling
    * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}.
    */
   @Test
   public void testReadValidationFailsMissingConfiguration() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read();
     thrown.expect(NullPointerException.class);
-    read.validate(input);
+    read.validateTransform();
   }
 
   /**
@@ -328,10 +327,10 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myKeyTranslate's (simple function provided by user for key
-   * translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set in
-   * configuration as "key.class").
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when myKeyTranslate's (simple function provided by user for
+   * key translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set
+   * in configuration as "key.class").
    */
   @Test
   public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
@@ -351,14 +350,14 @@ public class HadoopInputFormatIOTest {
         serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
             InputFormat.class), serConf.getHadoopConfiguration()
             .getClass("key.class", Object.class)));
-    read.validate(input);
+    read.validateTransform();
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myValueTranslate's (simple function provided by user for value
-   * translation) input type is not same as Hadoop InputFormat's valueClass(Which is property set in
-   * configuration as "value.class").
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when myValueTranslate's (simple function provided by user
+   * for value translation) input type is not same as Hadoop InputFormat's valueClass(Which is
+   * property set in configuration as "value.class").
    */
   @Test
   public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() {
@@ -382,7 +381,7 @@ public class HadoopInputFormatIOTest {
             serConf.getHadoopConfiguration().getClass("value.class", Object.class));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(expectedMessage);
-    read.validate(input);
+    read.validateTransform();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 1c8afbd..eee8927 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -255,7 +255,7 @@ public class HBaseIO {
         }
 
         @Override
-        public void validate(PBegin input) {
+        public void validate(PipelineOptions options) {
             checkArgument(serializableConfiguration != null,
                     "Configuration not provided");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
@@ -580,7 +580,7 @@ public class HBaseIO {
         }
 
         @Override
-        public void validate(PCollection<KV<byte[], Iterable<Mutation>>> input) {
+        public void validate(PipelineOptions options) {
             checkArgument(serializableConfiguration != null, "Configuration not specified");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
             try (Connection connection = ConnectionFactory.createConnection(

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index bf8cb4b..6308931 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -292,6 +292,7 @@ public class HBaseIOTest {
         thrown.expectMessage(String.format("Table %s does not exist", table));
 
         emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.run();
     }
 
     /** Tests that when writing an element fails, the write fails. */

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 03e7c70..86a9246 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -102,12 +102,15 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
         "%s can only be applied to an unbounded PCollection if doing windowed writes",
         Write.class.getSimpleName());
-    PipelineOptions options = input.getPipeline().getOptions();
-    sink.validate(options);
     return createWrite(input, sink.createWriteOperation());
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
+
+  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index b26a47d..2d48236 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -32,6 +32,7 @@ import javax.sql.DataSource;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -340,7 +341,7 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(getQuery() != null,
           "JdbcIO.read() requires a query to be set via withQuery(query)");
       checkState(getRowMapper() != null,
@@ -445,7 +446,7 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkArgument(getDataSourceConfiguration() != null,
           "JdbcIO.write() requires a configuration to be set via "
               + ".withDataSourceConfiguration(configuration)");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 104bea4..813e051 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -295,7 +295,7 @@ public class JmsIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(getConnectionFactory() != null, "JmsIO.read() requires a JMS connection "
           + "factory to be set via withConnectionFactory(connectionFactory)");
       checkState((getQueue() != null || getTopic() != null), "JmsIO.read() requires a JMS "
@@ -654,7 +654,7 @@ public class JmsIO {
     }
 
     @Override
-    public void validate(PCollection<String> input) {
+    public void validate(PipelineOptions options) {
       checkState(getConnectionFactory() != null, "JmsIO.write() requires a JMS connection "
           + "factory to be set via withConnectionFactory(connectionFactory)");
       checkState((getQueue() != null || getTopic() != null), "JmsIO.write() requires a JMS "

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 211f1a4..000df70 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -617,30 +617,13 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PBegin input)  {
+    public void validate(PipelineOptions options) {
       checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
           "Kafka topics or topic_partitions are required");
       checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
       checkNotNull(getValueDeserializer(), "Value deserializer must be set");
-
-      if (input != null) {
-        CoderRegistry registry = input.getPipeline().getCoderRegistry();
-
-        checkNotNull(getKeyCoder() != null
-                        ? getKeyCoder()
-                        : inferCoder(registry, getKeyDeserializer()),
-                "Key coder must be set");
-
-        checkNotNull(getValueCoder() != null
-                        ? getValueCoder()
-                        : inferCoder(registry, getValueDeserializer()),
-                "Value coder must be set");
-      } else {
-        checkNotNull(getKeyCoder(), "Key coder must be set");
-        checkNotNull(getValueCoder(), "Value coder must be set");
-      }
     }
 
     @Override
@@ -648,13 +631,17 @@ public class KafkaIO {
       // Infer key/value coders if not specified explicitly
       CoderRegistry registry = input.getPipeline().getCoderRegistry();
 
-      Coder<K> keyCoder = getKeyCoder() != null
-              ? getKeyCoder()
-              : inferCoder(registry, getKeyDeserializer());
+      Coder<K> keyCoder =
+          checkNotNull(
+              getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
+              "Key coder must be set");
 
-      Coder<V> valueCoder = getValueCoder() != null
-              ? getValueCoder()
-              : inferCoder(registry, getValueDeserializer());
+      Coder<V> valueCoder =
+          checkNotNull(
+              getValueCoder() != null
+                  ? getValueCoder()
+                  : inferCoder(registry, getValueDeserializer()),
+              "Value coder must be set");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
@@ -1523,7 +1510,7 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 2b7fb0a..f8edbf1 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -167,7 +167,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(uri(), "uri");
       checkNotNull(database(), "database");
       checkNotNull(collection(), "collection");
@@ -444,7 +444,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public void validate(PCollection<Document> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(uri(), "uri");
       checkNotNull(database(), "database");
       checkNotNull(collection(), "collection");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 1df445f..0f25b0f 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -291,7 +291,7 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       // validation is performed in the ConnectionConfiguration create()
     }
 
@@ -541,7 +541,7 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PCollection<byte[]> input) {
+    public void validate(PipelineOptions options) {
       // validate is done in connection configuration
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 47715e9..c41d6bc 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CompressedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -381,7 +382,7 @@ public class XmlIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(
           getRootElement(),
           "rootElement is null. Use builder method withRootElement() to set this.");
@@ -505,7 +506,7 @@ public class XmlIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
       checkNotNull(getRootElement(), "Missing a root element name.");
       checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");


[5/5] beam git commit: This closes #2666: Remove some construction-time uses of PipelineOptions

Posted by ke...@apache.org.
This closes #2666: Remove some construction-time uses of PipelineOptions

  Supply PipelineOptions at Pipeline.run()
  Move PTransform.validate to post-construction, modulo BigQueryIO
  Remove PipelineOptions from createWriteOperation()
  Move stable name validation to Pipeline.run()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5e3f523
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5e3f523
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5e3f523

Branch: refs/heads/master
Commit: f5e3f5230af35da5a03ba9740f087b0f22df6dca
Parents: be88366 77a2545
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Apr 29 12:06:07 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 12:06:07 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ForwardingPTransform.java |   5 +-
 .../construction/ForwardingPTransformTest.java  |   7 +-
 .../construction/PTransformMatchersTest.java    |   4 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../beam/runners/flink/FlinkTestPipeline.java   |   9 +-
 .../beam/runners/dataflow/AssignWindows.java    |   5 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  12 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 133 +++++++++++++------
 .../apache/beam/sdk/coders/CoderRegistry.java   |   9 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |   2 +-
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   3 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |   9 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |   9 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |  27 ++--
 .../apache/beam/sdk/transforms/PTransform.java  |   8 +-
 .../org/apache/beam/sdk/transforms/View.java    |  25 +---
 .../beam/sdk/transforms/windowing/Window.java   |   5 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  39 +++---
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   2 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |   2 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |   4 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  27 ++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  41 ++----
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  12 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |   4 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |   6 +-
 .../hadoop/inputformat/HadoopInputFormatIO.java |  10 +-
 .../inputformat/HadoopInputFormatIOTest.java    |  27 ++--
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |   4 +-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |   1 +
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |   2 +-
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  |   2 +-
 .../java/org/apache/beam/sdk/io/hdfs/Write.java |   7 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      |   2 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |   5 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |   4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  37 ++----
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |   4 +-
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |   4 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   5 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |   2 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   8 +-
 44 files changed, 296 insertions(+), 243 deletions(-)
----------------------------------------------------------------------



[4/5] beam git commit: Supply PipelineOptions at Pipeline.run()

Posted by ke...@apache.org.
Supply PipelineOptions at Pipeline.run()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77a25452
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77a25452
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77a25452

Branch: refs/heads/master
Commit: 77a25452f87dd380eed981603252c238089d4439
Parents: 19aa8ba
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 12:50:57 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 12:05:34 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkTestPipeline.java   |  9 +--
 .../runners/dataflow/DataflowRunnerTest.java    | 12 +++-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 68 +++++++++++++-------
 .../apache/beam/sdk/testing/TestPipeline.java   |  9 +--
 .../java/org/apache/beam/sdk/PipelineTest.java  | 28 +++++---
 5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
index d6240c4..f3498be 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -18,9 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
 
 /**
  * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
@@ -61,12 +59,11 @@ public class FlinkTestPipeline extends Pipeline {
    */
   private static FlinkTestPipeline create(boolean streaming) {
     TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
-    return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+    return new FlinkTestPipeline(flinkRunner.getPipelineOptions());
   }
 
-  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
-              PipelineOptions options) {
-    super(runner, options);
+  private FlinkTestPipeline(PipelineOptions options) {
+    super(options);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 433fb77..c1d3fe6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -252,7 +252,7 @@ public class DataflowRunnerTest {
     };
 
     try {
-      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
       fail();
     } catch (RuntimeException e) {
       assertThat(
@@ -271,7 +271,7 @@ public class DataflowRunnerTest {
     };
 
     try {
-      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
       fail();
     } catch (RuntimeException e) {
       assertThat(
@@ -917,7 +917,13 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions streamingOptions = buildPipelineOptions();
     streamingOptions.setStreaming(true);
     streamingOptions.setRunner(DataflowRunner.class);
-    Pipeline.create(streamingOptions);
+    Pipeline p = Pipeline.create(streamingOptions);
+
+    // Instantiation of a runner prior to run() currently has a side effect of mutating the options.
+    // This could be tested by DataflowRunner.fromOptions(streamingOptions) but would not ensure
+    // that the pipeline itself had the expected options set.
+    p.run();
+
     assertEquals(
         DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
         streamingOptions.getGcsUploadBufferSizeBytes().intValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 203bd14..d578a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -29,10 +29,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
@@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory;
  */
 public class Pipeline {
   private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
-
   /**
    * Thrown during execution of a {@link Pipeline}, whenever user code within that
    * {@link Pipeline} throws an exception.
@@ -133,12 +132,23 @@ public class Pipeline {
   // Public operations.
 
   /**
+   * Constructs a pipeline from default options.
+   *
+   * @return The newly created pipeline.
+   */
+  public static Pipeline create() {
+    Pipeline pipeline = new Pipeline(PipelineOptionsFactory.create());
+    LOG.debug("Creating {}", pipeline);
+    return pipeline;
+  }
+
+  /**
    * Constructs a pipeline from the provided options.
    *
    * @return The newly created pipeline.
    */
   public static Pipeline create(PipelineOptions options) {
-    Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options);
+    Pipeline pipeline = new Pipeline(options);
     LOG.debug("Creating {}", pipeline);
     return pipeline;
   }
@@ -270,9 +280,22 @@ public class Pipeline {
   }
 
   /**
-   * Runs the {@link Pipeline} using its {@link PipelineRunner}.
+   * Runs this {@link Pipeline} using the default {@link PipelineOptions} provided
+   * to {@link #create(PipelineOptions)}.
+   *
+   * <p>It is an error to call this method if the pipeline was created without
+   * a default set of options.
    */
   public PipelineResult run() {
+    return run(defaultOptions);
+  }
+
+  /**
+   * Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner
+   * specified by the options.
+   */
+  public PipelineResult run(PipelineOptions options) {
+    PipelineRunner runner = PipelineRunner.fromOptions(options);
     // Ensure all of the nodes are fully specified before a PipelineRunner gets access to the
     // pipeline.
     LOG.debug("Running {} via {}", this, runner);
@@ -417,16 +440,14 @@ public class Pipeline {
   /////////////////////////////////////////////////////////////////////////////
   // Below here are internal operations, never called by users.
 
-  private final PipelineRunner<?> runner;
-  private final PipelineOptions options;
   private final TransformHierarchy transforms = new TransformHierarchy(this);
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
+  private final PipelineOptions defaultOptions;
 
-  protected Pipeline(PipelineRunner<?> runner, PipelineOptions options) {
-    this.runner = runner;
-    this.options = options;
+  protected Pipeline(PipelineOptions options) {
+    this.defaultOptions = options;
   }
 
   @Override
@@ -435,6 +456,18 @@ public class Pipeline {
   }
 
   /**
+   * Returns the default {@link PipelineOptions} provided to {@link #create(PipelineOptions)}.
+   *
+   * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly
+   *     provided to a transform if it is required.
+   */
+  @Deprecated
+  public PipelineOptions getOptions() {
+    return defaultOptions;
+  }
+
+
+  /**
    * Applies a {@link PTransform} to the given {@link PInput}.
    *
    * @see Pipeline#apply
@@ -516,19 +549,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns the {@link PipelineOptions} provided at the time this {@link Pipeline} was created.
-   *
-   * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly
-   *     provided to a transform if it is required. This method will be removed within a Major
-   *     Version and should not be used.
-   */
-  @Deprecated
-  @Experimental
-  public PipelineOptions getOptions() {
-    return options;
-  }
-
-  /**
    * Returns a unique name for a transform with the given prefix (from
    * enclosing transforms) and initial name.
    *
@@ -572,7 +592,9 @@ public class Pipeline {
 
     @Override
     public CompositeBehavior enterCompositeTransform(Node node) {
-      node.getTransform().validate(options);
+      if (node.getTransform() != null) {
+        node.getTransform().validate(options);
+      }
       return CompositeBehavior.ENTER_TRANSFORM;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 1273442..d45106c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.junit.experimental.categories.Category;
@@ -257,12 +256,11 @@ public class TestPipeline extends Pipeline implements TestRule {
   }
 
   public static TestPipeline fromOptions(PipelineOptions options) {
-    return new TestPipeline(PipelineRunner.fromOptions(options), options);
+    return new TestPipeline(options);
   }
 
-  private TestPipeline(
-      final PipelineRunner<? extends PipelineResult> runner, final PipelineOptions options) {
-    super(runner, options);
+  private TestPipeline(final PipelineOptions options) {
+    super(options);
   }
 
   @Override
@@ -316,7 +314,6 @@ public class TestPipeline extends Pipeline implements TestRule {
    * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError} that is raised during
    * testing.
    */
-  @Override
   public PipelineResult run() {
     checkState(
         enforcement.isPresent(),

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5ddfc57..fda64b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -87,16 +87,15 @@ public class PipelineTest {
   @Rule public ExpectedLogs logged = ExpectedLogs.none(Pipeline.class);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  static class PipelineWrapper extends Pipeline {
-    protected PipelineWrapper(PipelineRunner<?> runner) {
-      super(runner, PipelineOptionsFactory.create());
-    }
-  }
-
   // Mock class that throws a user code exception during the call to
   // Pipeline.run().
   static class TestPipelineRunnerThrowingUserException
       extends PipelineRunner<PipelineResult> {
+
+    public static TestPipelineRunnerThrowingUserException fromOptions(PipelineOptions options) {
+      return new TestPipelineRunnerThrowingUserException();
+    }
+
     @Override
     public PipelineResult run(Pipeline pipeline) {
       Throwable t = new IllegalStateException("user code exception");
@@ -106,8 +105,13 @@ public class PipelineTest {
 
   // Mock class that throws an SDK or API client code exception during
   // the call to Pipeline.run().
-  static class TestPipelineRunnerThrowingSDKException
+  static class TestPipelineRunnerThrowingSdkException
       extends PipelineRunner<PipelineResult> {
+
+    public static TestPipelineRunnerThrowingSdkException fromOptions(PipelineOptions options) {
+      return new TestPipelineRunnerThrowingSdkException();
+    }
+
     @Override
     public PipelineResult run(Pipeline pipeline) {
       throw new IllegalStateException("SDK exception");
@@ -116,8 +120,9 @@ public class PipelineTest {
 
   @Test
   public void testPipelineUserExceptionHandling() {
-    Pipeline p = new PipelineWrapper(
-        new TestPipelineRunnerThrowingUserException());
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(TestPipelineRunnerThrowingUserException.class);
+    Pipeline p = Pipeline.create(options);
 
     // Check pipeline runner correctly catches user errors.
     thrown.expect(PipelineExecutionException.class);
@@ -128,7 +133,9 @@ public class PipelineTest {
 
   @Test
   public void testPipelineSDKExceptionHandling() {
-    Pipeline p = new PipelineWrapper(new TestPipelineRunnerThrowingSDKException());
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(TestPipelineRunnerThrowingSdkException.class);
+    Pipeline p = Pipeline.create(options);
 
     // Check pipeline runner correctly catches SDK errors.
     try {
@@ -389,6 +396,7 @@ public class PipelineTest {
 
   @Test
   public void testReplacedNames() {
+    pipeline.enableAbandonedNodeEnforcement(false);
     final PCollection<String> originalInput = pipeline.apply(Create.of("foo", "bar", "baz"));
     class OriginalTransform extends PTransform<PCollection<String>, PCollection<Long>> {
       @Override


[2/5] beam git commit: Remove PipelineOptions from createWriteOperation()

Posted by ke...@apache.org.
Remove PipelineOptions from createWriteOperation()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4291fa6d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4291fa6d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4291fa6d

Branch: refs/heads/master
Commit: 4291fa6ddc75a7142cacb39025b613eca54c48c3
Parents: f6ebb04
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 13:57:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 10:41:33 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/PTransformMatchersTest.java    | 4 +---
 .../beam/runners/direct/WriteWithShardingFactoryTest.java    | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/AvroIO.java    | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/FileBasedSink.java  | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/TFRecordIO.java     | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/TextIO.java    | 3 +--
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java     | 2 +-
 .../test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java  | 2 +-
 .../src/test/java/org/apache/beam/sdk/io/SimpleSink.java     | 2 +-
 .../main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java  | 2 +-
 .../hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/hdfs/Write.java     | 2 +-
 .../java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java   | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java    | 2 +-
 .../test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java    | 8 ++++----
 15 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index d9bc1e7..9754bb3 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -503,8 +502,7 @@ public class PTransformMatchersTest implements Serializable {
         WriteFiles.to(
             new FileBasedSink<Integer>("foo", "bar") {
               @Override
-              public FileBasedWriteOperation<Integer> createWriteOperation(
-                  PipelineOptions options) {
+              public FileBasedWriteOperation<Integer> createWriteOperation() {
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index b0c9f6d..960640c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -216,7 +216,7 @@ public class WriteWithShardingFactoryTest {
     public void validate(PipelineOptions options) {}
 
     @Override
-    public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
+    public FileBasedWriteOperation<Object> createWriteOperation() {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 24e158f..a48976f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -989,7 +989,7 @@ public class AvroIO {
     }
 
     @Override
-    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
+    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() {
       return new AvroWriteOperation<>(this, coder, codec, metadata);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index b8ad0a6..3354c67 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -449,7 +449,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
    * to the sink.
    */
-  public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
+  public abstract FileBasedWriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
     getFileNamePolicy().populateDisplayData(builder);

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index a920283..1d7477b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -560,7 +560,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public FileBasedWriteOperation<byte[]> createWriteOperation(PipelineOptions options) {
+    public FileBasedWriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90dd80f..d161d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1041,8 +1041,7 @@ public class TextIO {
     }
 
     @Override
-    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation(
-        PipelineOptions options) {
+    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() {
       return new TextWriteOperation(this, header, footer);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 2787820..ba41593 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -124,7 +124,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
         WriteFiles.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
-    this.writeOperation = sink.createWriteOperation(options);
+    this.writeOperation = sink.createWriteOperation();
     this.writeOperation.setWindowedWrites(windowedWrites);
     return createWrite(input);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index fe65a83..7efe47c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -531,7 +531,7 @@ public class FileBasedSinkTest {
     final String testUid = "testId";
     SimpleSink.SimpleWriteOperation writeOp =
         new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
-            .createWriteOperation(null);
+            .createWriteOperation();
     final FileBasedWriter<String> writer =
         writeOp.createWriter(null);
     final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index e3cd9b6..8caf004 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -39,7 +39,7 @@ class SimpleSink extends FileBasedSink<String> {
   }
 
   @Override
-  public SimpleWriteOperation createWriteOperation(PipelineOptions options) {
+  public SimpleWriteOperation createWriteOperation() {
     return new SimpleWriteOperation(this);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index aa9e41e..aee73c4 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -250,7 +250,7 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
   }
 
   @Override
-  public Sink.WriteOperation<T, String> createWriteOperation(PipelineOptions options) {
+  public Sink.WriteOperation<T, String> createWriteOperation() {
     return new HDFSWriteOperation<>(this, path(), formatClass());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
index 5a3fcd9..fe2db5f 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
@@ -39,7 +39,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
   /**
    * Returns an instance of a {@link WriteOperation} that can write to this Sink.
    */
-  public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
+  public abstract WriteOperation<T, ?> createWriteOperation();
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 8c2fc99..03e7c70 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -104,7 +104,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
         Write.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
-    return createWrite(input, sink.createWriteOperation(options));
+    return createWrite(input, sink.createWriteOperation());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index aeb258f..9fa6606 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -63,7 +63,7 @@ public class HDFSFileSinkTest {
                            PipelineOptions options,
                            Iterable<T> toWrite) throws Exception {
     Sink.WriteOperation<T, String> writeOperation =
-        (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
+        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
     Sink.Writer<T, String> writer = writeOperation.createWriter(options);
     writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
     for (T t: toWrite) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index a1ebf6c..8a1621e 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -53,7 +53,7 @@ class XmlSink<T> extends FileBasedSink<T> {
    * Creates an {@link XmlWriteOperation}.
    */
   @Override
-  public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+  public XmlWriteOperation<T> createWriteOperation() {
     return new XmlWriteOperation<>(this);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4291fa6d/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index bf15cfe..7f9a8c5 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -76,7 +76,7 @@ public class XmlSinkTest {
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
 
     List<Bird> bundle =
@@ -97,7 +97,7 @@ public class XmlSinkTest {
             .withRootElement("birds")
             .withCharset(StandardCharsets.ISO_8859_1)
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
 
     List<Bird> bundle = Lists.newArrayList(new Bird("br�che", "pin�on"));
@@ -155,7 +155,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement)
             .toFilenamePrefix(testFilePrefix)
             .createSink();
-    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
+    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
@@ -175,7 +175,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement)
             .toFilenamePrefix(testFilePrefix)
             .createSink()
-            .createWriteOperation(options);
+            .createWriteOperation();
     XmlWriter<Bird> writer = writeOp.createWriter(options);
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();