You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/29 19:06:45 UTC

[3/5] beam git commit: Move PTransform.validate to post-construction, modulo BigQueryIO

Move PTransform.validate to post-construction, modulo BigQueryIO

PipelineOptions, as used for most validation, should not be available
at construction time. Instead, validation will be called just before
running a pipeline.

BigQueryIO is particularly problematic and will get further treatment.
For now, the workaround is to establish the proper validation methods
but then to call them (erroneously, basically) at construction time
in expand().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19aa8ba5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19aa8ba5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19aa8ba5

Branch: refs/heads/master
Commit: 19aa8ba576c2d43166dc6d67a4c5c103b3522870
Parents: 4291fa6
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 24 14:37:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Apr 29 11:39:00 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ForwardingPTransform.java |  5 ++-
 .../construction/ForwardingPTransformTest.java  |  7 ++--
 .../beam/runners/dataflow/AssignWindows.java    |  5 ++-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 22 ++++++++++-
 .../apache/beam/sdk/coders/CoderRegistry.java   |  9 ++++-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  7 +++-
 .../apache/beam/sdk/transforms/GroupByKey.java  | 27 ++++++-------
 .../apache/beam/sdk/transforms/PTransform.java  |  8 ++--
 .../org/apache/beam/sdk/transforms/View.java    | 25 +++---------
 .../beam/sdk/transforms/windowing/Window.java   |  5 ++-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |  4 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 27 +++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 41 ++++++--------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 +++---
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  4 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  6 +--
 .../hadoop/inputformat/HadoopInputFormatIO.java | 10 ++---
 .../inputformat/HadoopInputFormatIOTest.java    | 27 +++++++------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |  4 +-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  1 +
 .../java/org/apache/beam/sdk/io/hdfs/Write.java |  7 +++-
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     |  5 ++-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 37 ++++++------------
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |  4 +-
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |  4 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  5 ++-
 27 files changed, 168 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index 3bee281..2f427ad 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
 
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PInput;
@@ -40,8 +41,8 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
   }
 
   @Override
-  public void validate(InputT input) {
-    delegate().validate(input);
+  public void validate(PipelineOptions options) {
+    delegate().validate(options);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 7d3bfd8..74c056c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -79,11 +80,11 @@ public class ForwardingPTransformTest {
   @Test
   public void validateDelegates() {
     @SuppressWarnings("unchecked")
-    PCollection<Integer> input = Mockito.mock(PCollection.class);
-    Mockito.doThrow(RuntimeException.class).when(delegate).validate(input);
+    PipelineOptions options = Mockito.mock(PipelineOptions.class);
+    Mockito.doThrow(RuntimeException.class).when(delegate).validate(options);
 
     thrown.expect(RuntimeException.class);
-    forwarding.validate(input);
+    forwarding.validate(options);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 3e36899..ffd56c9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -73,8 +74,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
   }
 
   @Override
-  public void validate(PCollection<T> input) {
-    transform.validate(input);
+  public void validate(PipelineOptions options) {
+    transform.validate(options);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 716b328..203bd14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -454,7 +454,6 @@ public class Pipeline {
     transforms.pushNode(uniqueName, input, transform);
     try {
       transforms.finishSpecifyingInput();
-      transform.validate(input);
       OutputT output = transform.expand(input);
       transforms.setOutput(output);
 
@@ -493,6 +492,7 @@ public class Pipeline {
 
   @VisibleForTesting
   void validate(PipelineOptions options) {
+    this.traverseTopologically(new ValidateVisitor(options));
     if (!unstableNames.isEmpty()) {
       switch (options.getStableUniqueNames()) {
         case OFF:
@@ -561,4 +561,24 @@ public class Pipeline {
   private String buildName(String namePrefix, String name) {
     return namePrefix.isEmpty() ? name : namePrefix + "/" + name;
   }
+
+  private static class ValidateVisitor extends PipelineVisitor.Defaults {
+
+    private final PipelineOptions options;
+
+    public ValidateVisitor(PipelineOptions options) {
+      this.options = options;
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(Node node) {
+      node.getTransform().validate(options);
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      node.getTransform().validate(options);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index ab0a3e1..7f0f632 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -255,6 +255,9 @@ public class CoderRegistry implements CoderProvider {
       TypeDescriptor<InputT> inputTypeDescriptor,
       Coder<InputT> inputCoder)
       throws CannotProvideCoderException {
+    checkArgument(typeDescriptor != null);
+    checkArgument(inputTypeDescriptor != null);
+    checkArgument(inputCoder != null);
     return getDefaultCoder(
         typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
   }
@@ -859,6 +862,8 @@ public class CoderRegistry implements CoderProvider {
    * in the given {@link Coder}.
    */
   private Map<Type, Coder<?>> getTypeToCoderBindings(Type type, Coder<?> coder) {
+    checkArgument(type != null);
+    checkArgument(coder != null);
     if (type instanceof TypeVariable || type instanceof Class) {
       return ImmutableMap.<Type, Coder<?>>of(type, coder);
     } else if (type instanceof ParameterizedType) {
@@ -889,7 +894,9 @@ public class CoderRegistry implements CoderProvider {
       for (int i = 0; i < typeArguments.size(); i++) {
         Type typeArgument = typeArguments.get(i);
         Coder<?> coderArgument = coderArguments.get(i);
-        typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument));
+        if (coderArgument != null) {
+          typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument));
+        }
       }
 
       return ImmutableMap.<Type, Coder<?>>builder().putAll(typeToCoder).build();

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index ba41593..dcd600f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -122,14 +122,17 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
         "%s can only be applied to an unbounded PCollection if doing windowed writes",
         WriteFiles.class.getSimpleName());
-    PipelineOptions options = input.getPipeline().getOptions();
-    sink.validate(options);
     this.writeOperation = sink.createWriteOperation();
     this.writeOperation.setWindowedWrites(windowedWrites);
     return createWrite(input);
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
+
+  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index d9c4c9f..6a89c5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -184,21 +184,6 @@ public class GroupByKey<K, V>
     }
   }
 
-  @Override
-  public void validate(PCollection<KV<K, V>> input) {
-    applicableTo(input);
-
-    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
-    // the key coder is deterministic.
-    Coder<K> keyCoder = getKeyCoder(input.getCoder());
-    try {
-      keyCoder.verifyDeterministic();
-    } catch (NonDeterministicException e) {
-      throw new IllegalStateException(
-          "the keyCoder of a GroupByKey must be deterministic", e);
-    }
-  }
-
   public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
     WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
     if (!inputWindowFn.isNonMerging()) {
@@ -216,6 +201,18 @@ public class GroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    applicableTo(input);
+
+    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
+    // the key coder is deterministic.
+    Coder<K> keyCoder = getKeyCoder(input.getCoder());
+    try {
+      keyCoder.verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalStateException(
+          "the keyCoder of a GroupByKey must be deterministic", e);
+    }
+
     // This primitive operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 687938d..4f651f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.NameUtils;
@@ -187,13 +188,12 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
   public abstract OutputT expand(InputT input);
 
   /**
-   * Called before invoking apply (which may be intercepted by the runner) to
-   * verify this transform is fully specified and applicable to the specified
-   * input.
+   * Called before running the Pipeline to verify this transform is fully and correctly
+   * specified.
    *
    * <p>By default, does nothing.
    */
-  public void validate(InputT input) {}
+  public void validate(PipelineOptions options) {}
 
   /**
    * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 14035b0..0495ad6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -246,16 +246,13 @@ public class View {
     private AsList() { }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
           input, input.getWindowingStrategy(), input.getCoder())));
     }
@@ -272,16 +269,13 @@ public class View {
     private AsIterable() { }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
           input, input.getWindowingStrategy(), input.getCoder())));
     }
@@ -329,16 +323,13 @@ public class View {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public PCollectionView<T> expand(PCollection<T> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
       Combine.Globally<T, T> singletonCombine =
           Combine.globally(new SingletonCombineFn<>(hasDefault, input.getCoder(), defaultValue));
       if (!hasDefault) {
@@ -415,16 +406,13 @@ public class View {
     private AsMultimap() { }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
           PCollectionViews.multimapView(
               input,
@@ -452,16 +440,13 @@ public class View {
     }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       try {
         GroupByKey.applicableTo(input);
       } catch (IllegalStateException e) {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
-    }
 
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
           PCollectionViews.mapView(
               input,

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index cb7b430..d010d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -324,8 +324,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     return result;
   }
 
-  @Override
-  public void validate(PCollection<T> input) {
+  private void applicableTo(PCollection<?> input) {
     WindowingStrategy<?, ?> outputStrategy =
         getOutputStrategyInternal(input.getWindowingStrategy());
 
@@ -363,6 +362,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
 
   @Override
   public PCollection<T> expand(PCollection<T> input) {
+    applicableTo(input);
+
     WindowingStrategy<?, ?> outputStrategy =
         getOutputStrategyInternal(input.getWindowingStrategy());
     if (getWindowFn() == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 8e138ef..0a3b900 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -399,7 +399,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(
           getConnectionConfiguration() != null,
           "ElasticsearchIO.read() requires a connection configuration"
@@ -725,7 +725,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public void validate(PCollection<String> input) {
+    public void validate(PipelineOptions options) {
       checkState(
           getConnectionConfiguration() != null,
           "ElasticsearchIO.write() requires a connection configuration"

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 38a0f6c..593c580 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -18,8 +18,11 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +34,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -45,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -77,10 +82,32 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    // We will use a BigQuery load job -- validate the temp location.
+    String tempLocation = options.getTempLocation();
+    checkArgument(
+        !Strings.isNullOrEmpty(tempLocation),
+        "BigQueryIO.Write needs a GCS temp location to store temp files.");
+    if (write.getBigQueryServices() == null) {
+      try {
+        GcsPath.fromUri(tempLocation);
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                tempLocation),
+            e);
+      }
+    }
+  }
+
+  @Override
   public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
     Pipeline p = input.getPipeline();
     BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
 
+    validate(p.getOptions());
+
     final String stepUuid = BigQueryHelpers.randomUUIDString();
 
     String tempLocation = options.getTempLocation();

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7ade33f..ea97906 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -388,10 +388,10 @@ public class BigQueryIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       // Even if existence validation is disabled, we need to make sure that the BigQueryIO
       // read is properly specified.
-      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 
       String tempLocation = bqOptions.getTempLocation();
       checkArgument(
@@ -905,8 +905,8 @@ public class BigQueryIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
-      BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    public void validate(PipelineOptions pipelineOptions) {
+      BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
 
       // We must have a destination to write to!
       checkState(getTableFunction() != null,
@@ -939,35 +939,13 @@ public class BigQueryIO {
           BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
         }
       }
-
-      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
-        // We will use BigQuery's streaming write API -- validate supported dispositions.
-        checkArgument(
-            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
-            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
-            + " PCollection.");
-      } else {
-        // We will use a BigQuery load job -- validate the temp location.
-        String tempLocation = options.getTempLocation();
-        checkArgument(
-            !Strings.isNullOrEmpty(tempLocation),
-            "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (getBigQueryServices() == null) {
-          try {
-            GcsPath.fromUri(tempLocation);
-          } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException(
-                String.format(
-                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                    tempLocation),
-                e);
-          }
-        }
-      }
     }
 
     @Override
     public WriteResult expand(PCollection<T> input) {
+
+      validate(input.getPipeline().getOptions());
+
       PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
           input.apply("PrepareWrite", new PrepareWrite<T>(
               getTableFunction(), getFormatFunction()))
@@ -977,6 +955,11 @@ public class BigQueryIO {
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamingInserts and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED) {
+        checkArgument(
+            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
+            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
+                + " PCollection.");
+
         return rowsWithDestination.apply(new StreamingInserts(this));
       } else {
         return rowsWithDestination.apply(new BatchLoads(this));

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ca7460..69fac68 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -286,12 +286,12 @@ public class BigtableIO {
     }
 
     @Override
-    public void validate(PBegin input) {
-      checkArgument(options != null, "BigtableOptions not specified");
+    public void validate(PipelineOptions options) {
+      checkArgument(this.options != null, "BigtableOptions not specified");
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            getBigtableService(options).tableExists(tableId),
             "Table %s does not exist",
             tableId);
       } catch (IOException e) {
@@ -492,12 +492,12 @@ public class BigtableIO {
     }
 
     @Override
-    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      checkArgument(options != null, "BigtableOptions not specified");
+    public void validate(PipelineOptions options) {
+      checkArgument(this.options != null, "BigtableOptions not specified");
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            getBigtableService(options).tableExists(tableId),
             "Table %s does not exist",
             tableId);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index aa0019c..f619429 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -591,7 +591,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getProjectId(), "projectId");
 
       if (getProjectId().isAccessible() && getProjectId().get() == null) {
@@ -1068,7 +1068,7 @@ public class DatastoreV1 {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(projectId, "projectId ValueProvider");
       if (projectId.isAccessible()) {
         checkNotNull(projectId.get(), "projectId");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 9e93887..0b94ded 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -270,8 +270,6 @@ public class BigtableIOTest {
   /** Tests that when reading from a non-existent table, the read fails. */
   @Test
   public void testReadingFailsTableDoesNotExist() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
     final String table = "TEST-TABLE";
 
     BigtableIO.Read read =
@@ -285,6 +283,7 @@ public class BigtableIOTest {
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
     p.apply(read);
+    p.run();
   }
 
   /** Tests that when reading from an empty table, the read succeeds. */
@@ -615,8 +614,6 @@ public class BigtableIOTest {
   /** Tests that when writing to a non-existent table, the write fails. */
   @Test
   public void testWritingFailsTableDoesNotExist() throws Exception {
-    p.enableAbandonedNodeEnforcement(false);
-
     final String table = "TEST-TABLE";
 
     PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
@@ -629,6 +626,7 @@ public class BigtableIOTest {
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
     emptyInput.apply("write", defaultWrite.withTableId(table));
+    p.run();
   }
 
   /** Tests that when writing an element fails, the write fails. */

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 93ff108..336740c 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -23,7 +23,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -39,9 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -288,6 +285,7 @@ public class HadoopInputFormatIO {
 
     @Override
     public PCollection<KV<K, V>> expand(PBegin input) {
+      validateTransform();
       // Get the key and value coders based on the key and value classes.
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry);
@@ -315,10 +313,10 @@ public class HadoopInputFormatIO {
     }
 
     /**
-     * Validates inputs provided by the pipeline user before reading the data.
+     * Validates construction of this transform.
      */
-    @Override
-    public void validate(PBegin input) {
+    @VisibleForTesting
+    void validateTransform() {
       checkNotNull(getConfiguration(), "getConfiguration()");
       // Validate that the key translation input type must be same as key class of InputFormat.
       validateTranslationFunction(getinputFormatKeyClass(), getKeyTranslationFunction(),

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index ccde03f..aeeeb17 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertThat;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -272,15 +271,15 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when Read transform is created without calling
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when Read transform is created without calling
    * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}.
    */
   @Test
   public void testReadValidationFailsMissingConfiguration() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read();
     thrown.expect(NullPointerException.class);
-    read.validate(input);
+    read.validateTransform();
   }
 
   /**
@@ -328,10 +327,10 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myKeyTranslate's (simple function provided by user for key
-   * translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set in
-   * configuration as "key.class").
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when myKeyTranslate's (simple function provided by user for
+   * key translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set
+   * in configuration as "key.class").
    */
   @Test
   public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
@@ -351,14 +350,14 @@ public class HadoopInputFormatIOTest {
         serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
             InputFormat.class), serConf.getHadoopConfiguration()
             .getClass("key.class", Object.class)));
-    read.validate(input);
+    read.validateTransform();
   }
 
   /**
-   * This test validates functionality of {@link HadoopInputFormatIO.Read#validate()
-   * Read.validate()} function when myValueTranslate's (simple function provided by user for value
-   * translation) input type is not same as Hadoop InputFormat's valueClass(Which is property set in
-   * configuration as "value.class").
+   * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform()
+   * Read.validateTransform()} function when myValueTranslate's (simple function provided by user
+   * for value translation) input type is not same as Hadoop InputFormat's valueClass(Which is
+   * property set in configuration as "value.class").
    */
   @Test
   public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() {
@@ -382,7 +381,7 @@ public class HadoopInputFormatIOTest {
             serConf.getHadoopConfiguration().getClass("value.class", Object.class));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(expectedMessage);
-    read.validate(input);
+    read.validateTransform();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 1c8afbd..eee8927 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -255,7 +255,7 @@ public class HBaseIO {
         }
 
         @Override
-        public void validate(PBegin input) {
+        public void validate(PipelineOptions options) {
             checkArgument(serializableConfiguration != null,
                     "Configuration not provided");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
@@ -580,7 +580,7 @@ public class HBaseIO {
         }
 
         @Override
-        public void validate(PCollection<KV<byte[], Iterable<Mutation>>> input) {
+        public void validate(PipelineOptions options) {
             checkArgument(serializableConfiguration != null, "Configuration not specified");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
             try (Connection connection = ConnectionFactory.createConnection(

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index bf8cb4b..6308931 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -292,6 +292,7 @@ public class HBaseIOTest {
         thrown.expectMessage(String.format("Table %s does not exist", table));
 
         emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.run();
     }
 
     /** Tests that when writing an element fails, the write fails. */

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
index 03e7c70..86a9246 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -102,12 +102,15 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
         "%s can only be applied to an unbounded PCollection if doing windowed writes",
         Write.class.getSimpleName());
-    PipelineOptions options = input.getPipeline().getOptions();
-    sink.validate(options);
     return createWrite(input, sink.createWriteOperation());
   }
 
   @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
+
+  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index b26a47d..2d48236 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -32,6 +32,7 @@ import javax.sql.DataSource;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -340,7 +341,7 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(getQuery() != null,
           "JdbcIO.read() requires a query to be set via withQuery(query)");
       checkState(getRowMapper() != null,
@@ -445,7 +446,7 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkArgument(getDataSourceConfiguration() != null,
           "JdbcIO.write() requires a configuration to be set via "
               + ".withDataSourceConfiguration(configuration)");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 104bea4..813e051 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -295,7 +295,7 @@ public class JmsIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkState(getConnectionFactory() != null, "JmsIO.read() requires a JMS connection "
           + "factory to be set via withConnectionFactory(connectionFactory)");
       checkState((getQueue() != null || getTopic() != null), "JmsIO.read() requires a JMS "
@@ -654,7 +654,7 @@ public class JmsIO {
     }
 
     @Override
-    public void validate(PCollection<String> input) {
+    public void validate(PipelineOptions options) {
       checkState(getConnectionFactory() != null, "JmsIO.write() requires a JMS connection "
           + "factory to be set via withConnectionFactory(connectionFactory)");
       checkState((getQueue() != null || getTopic() != null), "JmsIO.write() requires a JMS "

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 211f1a4..000df70 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -617,30 +617,13 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PBegin input)  {
+    public void validate(PipelineOptions options) {
       checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
           "Kafka topics or topic_partitions are required");
       checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
       checkNotNull(getValueDeserializer(), "Value deserializer must be set");
-
-      if (input != null) {
-        CoderRegistry registry = input.getPipeline().getCoderRegistry();
-
-        checkNotNull(getKeyCoder() != null
-                        ? getKeyCoder()
-                        : inferCoder(registry, getKeyDeserializer()),
-                "Key coder must be set");
-
-        checkNotNull(getValueCoder() != null
-                        ? getValueCoder()
-                        : inferCoder(registry, getValueDeserializer()),
-                "Value coder must be set");
-      } else {
-        checkNotNull(getKeyCoder(), "Key coder must be set");
-        checkNotNull(getValueCoder(), "Value coder must be set");
-      }
     }
 
     @Override
@@ -648,13 +631,17 @@ public class KafkaIO {
       // Infer key/value coders if not specified explicitly
       CoderRegistry registry = input.getPipeline().getCoderRegistry();
 
-      Coder<K> keyCoder = getKeyCoder() != null
-              ? getKeyCoder()
-              : inferCoder(registry, getKeyDeserializer());
+      Coder<K> keyCoder =
+          checkNotNull(
+              getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
+              "Key coder must be set");
 
-      Coder<V> valueCoder = getValueCoder() != null
-              ? getValueCoder()
-              : inferCoder(registry, getValueDeserializer());
+      Coder<V> valueCoder =
+          checkNotNull(
+              getValueCoder() != null
+                  ? getValueCoder()
+                  : inferCoder(registry, getValueDeserializer()),
+              "Value coder must be set");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
@@ -1523,7 +1510,7 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PCollection<KV<K, V>> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 2b7fb0a..f8edbf1 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -167,7 +167,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(uri(), "uri");
       checkNotNull(database(), "database");
       checkNotNull(collection(), "collection");
@@ -444,7 +444,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public void validate(PCollection<Document> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(uri(), "uri");
       checkNotNull(database(), "database");
       checkNotNull(collection(), "collection");

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 1df445f..0f25b0f 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -291,7 +291,7 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       // validation is performed in the ConnectionConfiguration create()
     }
 
@@ -541,7 +541,7 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PCollection<byte[]> input) {
+    public void validate(PipelineOptions options) {
       // validate is done in connection configuration
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/19aa8ba5/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 47715e9..c41d6bc 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CompressedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -381,7 +382,7 @@ public class XmlIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(
           getRootElement(),
           "rootElement is null. Use builder method withRootElement() to set this.");
@@ -505,7 +506,7 @@ public class XmlIO {
     }
 
     @Override
-    public void validate(PCollection<T> input) {
+    public void validate(PipelineOptions options) {
       checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
       checkNotNull(getRootElement(), "Missing a root element name.");
       checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");