You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:04 UTC

[40/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
deleted file mode 100644
index 0b78b83..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import org.joda.time.Instant;
-
-import java.util.UUID;
-
-/**
- * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
- * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}.  In the case of an empty PCollection, only
- * the global initialization and finalization will be performed.
- *
- * <p>Currently, only batch workflows can contain Write transforms.
- *
- * <p>Example usage:
- *
- * <p>{@code p.apply(Write.to(new MySink(...)));}
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
-  /**
-   * Creates a Write transform that writes to the given Sink.
-   */
-  public static <T> Bound<T> to(Sink<T> sink) {
-    return new Bound<>(sink);
-  }
-
-  /**
-   * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for
-   * documentation about writing to Sinks.
-   */
-  public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-    private final Sink<T> sink;
-
-    private Bound(Sink<T> sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      PipelineOptions options = input.getPipeline().getOptions();
-      sink.validate(options);
-      return createWrite(input, sink.createWriteOperation(options));
-    }
-
-    /**
-     * Returns the {@link Sink} associated with this PTransform.
-     */
-    public Sink<T> getSink() {
-      return sink;
-    }
-
-    /**
-     * A write is performed as sequence of three {@link ParDo}'s.
-     *
-     * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-     * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-     * called. The output of this ParDo is a singleton PCollection
-     * containing the WriteOperation.
-     *
-     * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-     * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-     * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every
-     * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i>
-     * objects (see {@link Sink} for a description of writer results)-one for each bundle.
-     *
-     * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-     * the collection of writer results as a side-input. In this ParDo,
-     * {@link WriteOperation#finalize} is called to finalize the write.
-     *
-     * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-     * before the exception that caused the write to fail is propagated and the write result will be
-     * discarded.
-     *
-     * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-     * deserialized in the bundle-writing and finalization phases, any state change to the
-     * WriteOperation object that occurs during initialization is visible in the latter phases.
-     * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-     * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-     * WriteOperation).
-     */
-    private <WriteT> PDone createWrite(
-        PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-      Pipeline p = input.getPipeline();
-
-      // A coder to use for the WriteOperation.
-      @SuppressWarnings("unchecked")
-      Coder<WriteOperation<T, WriteT>> operationCoder =
-          (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-      // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-      // the sink.
-      PCollection<WriteOperation<T, WriteT>> operationCollection =
-          p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
-
-      // Initialize the resource in a do-once ParDo on the WriteOperation.
-      operationCollection = operationCollection
-          .apply("Initialize", ParDo.of(
-              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              writeOperation.initialize(c.getPipelineOptions());
-              // The WriteOperation is also the output of this ParDo, so it can have mutable
-              // state.
-              c.output(writeOperation);
-            }
-          }))
-          .setCoder(operationCoder);
-
-      // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-      final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-          operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-      // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-      // as a side input) and collect the results of the writes in a PCollection.
-      // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-      // as a side input), so this will happen after the initial ParDo.
-      PCollection<WriteT> results = input
-          .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
-            // Writer that will write the records in this bundle. Lazily
-            // initialized in processElement.
-            private Writer<T, WriteT> writer = null;
-
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              // Lazily initialize the Writer
-              if (writer == null) {
-                WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-                writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-              }
-              try {
-                writer.write(c.element());
-              } catch (Exception e) {
-                // Discard write result and close the write.
-                try {
-                  writer.close();
-                } catch (Exception closeException) {
-                  // Do not mask the exception that caused the write to fail.
-                }
-                throw e;
-              }
-            }
-
-            @Override
-            public void finishBundle(Context c) throws Exception {
-              if (writer != null) {
-                WriteT result = writer.close();
-                // Output the result of the write.
-                c.outputWithTimestamp(result, Instant.now());
-              }
-            }
-          }).withSideInputs(writeOperationView))
-          .setCoder(writeOperation.getWriterResultCoder())
-          .apply(Window.<WriteT>into(new GlobalWindows()));
-
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-      // collection as a side input), so it will happen after the parallel write.
-      @SuppressWarnings("unused")
-      final PCollection<Integer> done = operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              Iterable<WriteT> results = c.sideInput(resultsView);
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              writeOperation.finalize(results, c.getPipelineOptions());
-            }
-          }).withSideInputs(resultsView));
-      return PDone.in(input.getPipeline());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
deleted file mode 100644
index b728c0a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Preconditions;
-
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
- * records from JAXB-annotated classes to a single file location.
- *
- * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this
- * Sink will produce a single file consisting of a single root element that contains all of the
- * elements in the PCollection.
- *
- * <p>XML Sinks are created with a base filename to write to, a root element name that will be used
- * for the root element of the output files, and a class to bind to an XML element. This class
- * will be used in the marshalling of records in an input PCollection to their XML representation
- * and must be able to be bound using JAXB annotations (checked at pipeline construction time).
- *
- * <p>XML Sinks can be written to using the {@link Write} transform:
- *
- * <pre>
- * p.apply(Write.to(
- *      XmlSink.ofRecordClass(Type.class)
- *          .withRootElementName(root_element)
- *          .toFilenamePrefix(output_filename)));
- * </pre>
- *
- * <p>For example, consider the following class with JAXB annotations:
- *
- * <pre>
- *  {@literal @}XmlRootElement(name = "word_count_result")
- *  {@literal @}XmlType(propOrder = {"word", "frequency"})
- *  public class WordFrequency {
- *    private String word;
- *    private long frequency;
- *
- *    public WordFrequency() { }
- *
- *    public WordFrequency(String word, long frequency) {
- *      this.word = word;
- *      this.frequency = frequency;
- *    }
- *
- *    public void setWord(String word) {
- *      this.word = word;
- *    }
- *
- *    public void setFrequency(long frequency) {
- *      this.frequency = frequency;
- *    }
- *
- *    public long getFrequency() {
- *      return frequency;
- *    }
- *
- *    public String getWord() {
- *      return word;
- *    }
- *  }
- * </pre>
- *
- * <p>The following will produce XML output with a root element named "words" from a PCollection of
- * WordFrequency objects:
- * <pre>
- * p.apply(Write.to(
- *  XmlSink.ofRecordClass(WordFrequency.class)
- *      .withRootElement("words")
- *      .toFilenamePrefix(output_file)));
- * </pre>
- *
- * <p>The output of which will look like:
- * <pre>
- * {@code
- * <words>
- *
- *  <word_count_result>
- *    <word>decreased</word>
- *    <frequency>1</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>War</word>
- *    <frequency>4</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>empress'</word>
- *    <frequency>14</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>stoops</word>
- *    <frequency>6</frequency>
- *  </word_count_result>
- *
- *  ...
- * </words>
- * }</pre>
- */
-// CHECKSTYLE.ON: JavadocStyle
-@SuppressWarnings("checkstyle:javadocstyle")
-public class XmlSink {
-  protected static final String XML_EXTENSION = "xml";
-
-  /**
-   * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root
-   * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link
-   * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively.
-   */
-  public static Bound<?> write() {
-    return new Bound<>(null, null, null);
-  }
-
-  /**
-   * Returns an XmlSink that writes objects as XML entities.
-   *
-   * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n
-   * is the number of output bundles that the Dataflow service divides the output into.
-   *
-   * @param klass the class of the elements to write.
-   * @param rootElementName the enclosing root element.
-   * @param baseOutputFilename the output filename prefix.
-   */
-  public static <T> Bound<T> writeOf(
-      Class<T> klass, String rootElementName, String baseOutputFilename) {
-    return new Bound<>(klass, rootElementName, baseOutputFilename);
-  }
-
-  /**
-   * A {@link FileBasedSink} that writes objects as XML elements.
-   */
-  public static class Bound<T> extends FileBasedSink<T> {
-    final Class<T> classToBind;
-    final String rootElementName;
-
-    private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) {
-      super(baseOutputFilename, XML_EXTENSION);
-      this.classToBind = classToBind;
-      this.rootElementName = rootElementName;
-    }
-
-    /**
-     * Returns an XmlSink that writes objects of the class specified as XML elements.
-     *
-     * <p>The specified class must be able to be used to create a JAXB context.
-     */
-    public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Returns an XmlSink that writes to files with the given prefix.
-     *
-     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
-     * the number of output bundles that the Dataflow service divides the output into.
-     */
-    public Bound<T> toFilenamePrefix(String baseOutputFilename) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Returns an XmlSink that writes XML files with an enclosing root element of the
-     * supplied name.
-     */
-    public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
-     * been set and that the class can be bound in a JAXB context.
-     */
-    @Override
-    public void validate(PipelineOptions options) {
-      Preconditions.checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
-      Preconditions.checkNotNull(rootElementName, "Missing a root element name.");
-      Preconditions.checkNotNull(baseOutputFilename, "Missing a filename to write to.");
-      try {
-        JAXBContext.newInstance(classToBind);
-      } catch (JAXBException e) {
-        throw new RuntimeException("Error binding classes to a JAXB Context.", e);
-      }
-    }
-
-    /**
-     * Creates an {@link XmlWriteOperation}.
-     */
-    @Override
-    public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new XmlWriteOperation<>(this);
-    }
-  }
-
-  /**
-   * {@link Sink.WriteOperation} for XML {@link Sink}s.
-   */
-  protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
-    public XmlWriteOperation(XmlSink.Bound<T> sink) {
-      super(sink);
-    }
-
-    /**
-     * Creates a {@link XmlWriter} with a marshaller for the type it will write.
-     */
-    @Override
-    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
-      JAXBContext context;
-      Marshaller marshaller;
-      context = JAXBContext.newInstance(getSink().classToBind);
-      marshaller = context.createMarshaller();
-      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
-      return new XmlWriter<>(this, marshaller);
-    }
-
-    /**
-     * Return the XmlSink.Bound for this write operation.
-     */
-    @Override
-    public XmlSink.Bound<T> getSink() {
-      return (XmlSink.Bound<T>) super.getSink();
-    }
-  }
-
-  /**
-   * A {@link Sink.Writer} that can write objects as XML elements.
-   */
-  protected static final class XmlWriter<T> extends FileBasedWriter<T> {
-    final Marshaller marshaller;
-    private OutputStream os = null;
-
-    public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
-      super(writeOperation);
-      this.marshaller = marshaller;
-    }
-
-    /**
-     * Creates the output stream that elements will be written to.
-     */
-    @Override
-    protected void prepareWrite(WritableByteChannel channel) throws Exception {
-      os = Channels.newOutputStream(channel);
-    }
-
-    /**
-     * Writes the root element opening tag.
-     */
-    @Override
-    protected void writeHeader() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
-    }
-
-    /**
-     * Writes the root element closing tag.
-     */
-    @Override
-    protected void writeFooter() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
-    }
-
-    /**
-     * Writes a value to the stream.
-     */
-    @Override
-    public void write(T value) throws Exception {
-      marshaller.marshal(value, os);
-    }
-
-    /**
-     * Return the XmlWriteOperation this write belongs to.
-     */
-    @Override
-    public XmlWriteOperation<T> getWriteOperation() {
-      return (XmlWriteOperation<T>) super.getWriteOperation();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
deleted file mode 100644
index 1ead391..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.common.base.Preconditions;
-
-import org.codehaus.stax2.XMLInputFactory2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.ValidationEvent;
-import javax.xml.bind.ValidationEventHandler;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@code PCollection} of a given type. An Dataflow read transform can be
- * created by passing an {@code XmlSource} object to {@code Read.from()}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Dataflow pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- *     .withRootElement("root")
- *     .withRecordElement("record")
- *     .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- *   <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- *   <li>Include a compatible implementation on the classpath at run-time,
- *       such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in Maven sdk/pom.xml file of
- * Google Cloud Dataflow.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- *        {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
-public class XmlSource<T> extends FileBasedSource<T> {
-
-  private static final String XML_VERSION = "1.1";
-  private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
-  private final String rootElement;
-  private final String recordElement;
-  private final Class<T> recordClass;
-
-  /**
-   * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
-   * pattern. Each XML file should be of the form defined in {@link XmlSource}.
-   */
-  public static <T> XmlSource<T> from(String fileOrPatternSpec) {
-    return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
-  }
-
-  /**
-   * Sets name of the root element of the XML document. This will be used to create a valid starting
-   * root element when initiating a bundle of records created from an XML document. This is a
-   * required parameter.
-   */
-  public XmlSource<T> withRootElement(String rootElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets name of the record element of the XML document. This will be used to determine offset of
-   * the first record of a bundle created from the XML document. This is a required parameter.
-   */
-  public XmlSource<T> withRecordElement(String recordElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
-   * will be used when unmarshalling record objects from the XML file.  This is a required
-   * parameter.
-   */
-  public XmlSource<T> withRecordClass(Class<T> recordClass) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
-   * to {@link OffsetBasedSource} for the definition of minBundleSize.  This is an optional
-   * parameter.
-   */
-  public XmlSource<T> withMinBundleSize(long minBundleSize) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
-      String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
-      String rootElement, String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize, startOffset, endOffset);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
-  }
-
-  @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new XmlSource<T>(
-        fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
-  }
-
-  @Override
-  protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-    return new XMLReader<T>(this);
-  }
-
-  @Override
-  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-    return false;
-  }
-
-  @Override
-  public void validate() {
-    super.validate();
-    Preconditions.checkNotNull(
-        rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
-    Preconditions.checkNotNull(
-        recordElement,
-        "recordElement is null. Use builder method withRecordElement() to set this.");
-    Preconditions.checkNotNull(
-        recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return JAXBCoder.of(recordClass);
-  }
-
-  public String getRootElement() {
-    return rootElement;
-  }
-
-  public String getRecordElement() {
-    return recordElement;
-  }
-
-  public Class<T> getRecordClass() {
-    return recordClass;
-  }
-
-  /**
-   * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
-   * file should be of the form defined at {@link XmlSource}.
-   *
-   * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp
-   * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-   *
-   * @param <T> Type of objects that will be read by the reader.
-   */
-  private static class XMLReader<T> extends FileBasedReader<T> {
-    // The amount of bytes read from the channel to memory when determining the starting offset of
-    // the first record in a bundle. After matching to starting offset of the first record the
-    // remaining bytes read to this buffer and the bytes still not read from the channel are used to
-    // create the XML parser.
-    private static final int BUF_SIZE = 1024;
-
-    // This should be the maximum number of bytes a character will encode to, for any encoding
-    // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be
-    // four bytes.
-    private static final int MAX_CHAR_BYTES = 4;
-
-    // In order to support reading starting in the middle of an XML file, we construct an imaginary
-    // well-formed document (a header and root tag followed by the contents of the input starting at
-    // the record boundary) and feed it to the parser. Because of this, the offset reported by the
-    // XML parser is not the same as offset in the original file. They differ by a constant amount:
-    // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset;
-    // Note that this is true only for files with single-byte characters.
-    // It appears that, as of writing, there does not exist a Java XML parser capable of correctly
-    // reporting byte offsets of elements in the presence of multi-byte characters.
-    private long parserBaseOffset = 0;
-    private boolean readingStarted = false;
-
-    // If true, the current bundle does not contain any records.
-    private boolean emptyBundle = false;
-
-    private Unmarshaller jaxbUnmarshaller = null;
-    private XMLStreamReader parser = null;
-
-    private T currentRecord = null;
-
-    // Byte offset of the current record in the XML file provided when creating the source.
-    private long currentByteOffset = 0;
-
-    public XMLReader(XmlSource<T> source) {
-      super(source);
-
-      // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
-      try {
-        JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
-        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
-        // Throw errors if validation fails. JAXB by default ignores validation errors.
-        jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
-          @Override
-          public boolean handleEvent(ValidationEvent event) {
-            throw new RuntimeException(event.getMessage(), event.getLinkedException());
-          }
-        });
-      } catch (JAXBException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public synchronized XmlSource<T> getCurrentSource() {
-      return (XmlSource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected void startReading(ReadableByteChannel channel) throws IOException {
-      // This method determines the correct starting offset of the first record by reading bytes
-      // from the ReadableByteChannel. This implementation does not need the channel to be a
-      // SeekableByteChannel.
-      // The method tries to determine the first record element in the byte channel. The first
-      // record must start with the characters "<recordElement" where "recordElement" is the
-      // record element of the XML document described above. For the match to be complete this
-      // has to be followed by one of following.
-      // * any whitespace character
-      // * '>' character
-      // * '/' character (to support empty records).
-      //
-      // After this match this method creates the XML parser for parsing the XML document,
-      // feeding it a fake document consisting of an XML header and the <rootElement> tag followed
-      // by the contents of channel starting from <recordElement. The <rootElement> tag may be never
-      // closed.
-
-      // This stores any bytes that should be used prior to the remaining bytes of the channel when
-      // creating an XML parser object.
-      ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
-      // A dummy declaration and root for the document with proper XML version and encoding. Without
-      // this XML parsing may fail or may produce incorrect results.
-
-      byte[] dummyStartDocumentBytes =
-          ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
-              + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
-      preambleByteBuffer.write(dummyStartDocumentBytes);
-      // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
-      // method returns the offset and stores any bytes that should be used when creating the XML
-      // parser in preambleByteBuffer.
-      long offsetInFileOfRecordElement =
-          getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
-      if (offsetInFileOfRecordElement < 0) {
-        // Bundle has no records. So marking this bundle as an empty bundle.
-        emptyBundle = true;
-        return;
-      } else {
-        byte[] preambleBytes = preambleByteBuffer.toByteArray();
-        currentByteOffset = offsetInFileOfRecordElement;
-        setUpXMLParser(channel, preambleBytes);
-        parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length;
-      }
-      readingStarted = true;
-    }
-
-    // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts
-    // any bytes read past the starting offset of the next record back to the preambleByteBuffer.
-    // If a record is found, returns the starting offset of the record, otherwise
-    // returns -1.
-    private long getFirstOccurenceOfRecordElement(
-        ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
-      int byteIndexInRecordElementToMatch = 0;
-      // Index of the byte in the string "<recordElement" to be matched
-      // against the current byte from the stream.
-      boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
-      // next character to confirm if this is a positive match.
-      boolean fullyMatched = false; // If true, record element was fully matched.
-
-      // This gives the offset of the byte currently being read. We do a '-1' here since we
-      // increment this value at the beginning of the while loop below.
-      long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
-      long startingOffsetInFileOfCurrentMatch = -1;
-      // If this is non-negative, currently there is a match in progress and this value gives the
-      // starting offset of the match currently being conducted.
-      boolean matchStarted = false; // If true, a match is currently in progress.
-
-      // These two values are used to determine the character immediately following a match for
-      // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
-      byte[] charBytes = new byte[MAX_CHAR_BYTES];
-      int charBytesFound = 0;
-
-      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
-      byte[] recordStartBytes =
-          ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
-
-      outer: while (channel.read(buf) > 0) {
-        buf.flip();
-        while (buf.hasRemaining()) {
-          offsetInFileOfCurrentByte++;
-          byte b = buf.get();
-          boolean reset = false;
-          if (recordStartBytesMatched) {
-            // We already matched "<recordElement" reading the next character to determine if this
-            // is a positive match for a new record.
-            charBytes[charBytesFound] = b;
-            charBytesFound++;
-            Character c = null;
-            if (charBytesFound == charBytes.length) {
-              CharBuffer charBuf = CharBuffer.allocate(1);
-              InputStream charBufStream = new ByteArrayInputStream(charBytes);
-              java.io.Reader reader =
-                  new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
-              int read = reader.read();
-              if (read <= 0) {
-                return -1;
-              }
-              charBuf.flip();
-              c = (char) read;
-            } else {
-              continue;
-            }
-
-            // Record start may be of following forms
-            // * "<recordElement<whitespace>..."
-            // * "<recordElement>..."
-            // * "<recordElement/..."
-            if (Character.isWhitespace(c) || c == '>' || c == '/') {
-              fullyMatched = true;
-              // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
-              // already read from the channel.
-              preambleByteBuffer.write(recordStartBytes);
-              preambleByteBuffer.write(charBytes);
-              // Also add the rest of the current buffer to preambleByteBuffer.
-              while (buf.hasRemaining()) {
-                preambleByteBuffer.write(buf.get());
-              }
-              break outer;
-            } else {
-              // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
-              ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
-              newbuf.put(charBytes);
-              offsetInFileOfCurrentByte -= charBytes.length;
-              while (buf.hasRemaining()) {
-                newbuf.put(buf.get());
-              }
-              newbuf.flip();
-              buf = newbuf;
-
-              // Ignore everything and try again starting from the current buffer.
-              reset = true;
-            }
-          } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
-            // Next byte matched.
-            if (!matchStarted) {
-              // Match was for the first byte, record the starting offset.
-              matchStarted = true;
-              startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
-            }
-            byteIndexInRecordElementToMatch++;
-          } else {
-            // Not a match. Ignore everything and try again starting at current point.
-            reset = true;
-          }
-          if (reset) {
-            // Clear variables and try to match starting from the next byte.
-            byteIndexInRecordElementToMatch = 0;
-            startingOffsetInFileOfCurrentMatch = -1;
-            matchStarted = false;
-            recordStartBytesMatched = false;
-            charBytes = new byte[MAX_CHAR_BYTES];
-            charBytesFound = 0;
-          }
-          if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
-            // "<recordElement" matched. Need to still check next byte since this might be an
-            // element that has "recordElement" as a prefix.
-            recordStartBytesMatched = true;
-          }
-        }
-        buf.clear();
-      }
-
-      if (!fullyMatched) {
-        return -1;
-      } else {
-        return startingOffsetInFileOfCurrentMatch;
-      }
-    }
-
-    private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException {
-      try {
-        // We use Woodstox because the StAX implementation provided by OpenJDK reports
-        // character locations incorrectly. Note that Woodstox still currently reports *byte*
-        // locations incorrectly when parsing documents that contain multi-byte characters.
-        XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance();
-        this.parser = xmlInputFactory.createXMLStreamReader(
-            new SequenceInputStream(
-                new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
-            "UTF-8");
-
-        // Current offset should be the offset before reading the record element.
-        while (true) {
-          int event = parser.next();
-          if (event == XMLStreamConstants.START_ELEMENT) {
-            String localName = parser.getLocalName();
-            if (localName.equals(getCurrentSource().recordElement)) {
-              break;
-            }
-          }
-        }
-      } catch (FactoryConfigurationError | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    protected boolean readNextRecord() throws IOException {
-      if (emptyBundle) {
-        currentByteOffset = Long.MAX_VALUE;
-        return false;
-      }
-      try {
-        // Update current offset and check if the next value is the record element.
-        currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-        while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
-          parser.next();
-          currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-          if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
-            currentByteOffset = Long.MAX_VALUE;
-            return false;
-          }
-        }
-        JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
-        currentRecord = jb.getValue();
-        return true;
-      } catch (JAXBException | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (!readingStarted) {
-        throw new NoSuchElementException();
-      }
-      return currentRecord;
-    }
-
-    @Override
-    protected boolean isAtSplitPoint() {
-      // Every record is at a split point.
-      return true;
-    }
-
-    @Override
-    protected long getCurrentOffset() {
-      return currentByteOffset;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 7d59b09..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,987 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.io.range.ByteKey;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table")
- *         .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- *     BigtableIO.write()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
-   * initialized with a
-   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
-   * specifies which table to read. A {@link RowFilter} may also optionally be specified using
-   * {@link BigtableIO.Read#withRowFilter}.
-   */
-  @Experimental
-  public static Read read() {
-    return new Read(null, "", null, null);
-  }
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
-   * initialized with a
-   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
-   * specifies which table to write.
-   */
-  @Experimental
-  public static Write write() {
-    return new Write(null, "", null);
-  }
-
-  /**
-   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Read extends PTransform<PBegin, PCollection<Row>> {
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Read}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Read(optionsWithAgent, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
-     * using the given row filter.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withRowFilter(RowFilter filter) {
-      checkNotNull(filter, "filter");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being read from.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PCollection<Row> apply(PBegin input) {
-      BigtableSource source =
-          new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
-      return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Read.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .toString();
-    }
-
-    /////////////////////////////////////////////////////////////////////////////////////////
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final RowFilter filter;
-    @Nullable private final BigtableService bigtableService;
-
-    private Read(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable RowFilter filter,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.filter = filter;
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Read withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Write
-      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final BigtableService bigtableService;
-
-    private Write(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Write}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Write(optionsWithAgent, tableId, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being written to.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      Sink sink = new Sink(tableId, getBigtableService());
-      return input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink));
-    }
-
-    @Override
-    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Write withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Write.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////
-  /** Disallow construction of utility class. */
-  private BigtableIO() {}
-
-  static class BigtableSource extends BoundedSource<Row> {
-    public BigtableSource(
-        BigtableService service,
-        String tableId,
-        @Nullable RowFilter filter,
-        ByteKeyRange range,
-        Long estimatedSizeBytes) {
-      this.service = service;
-      this.tableId = tableId;
-      this.filter = filter;
-      this.range = range;
-      this.estimatedSizeBytes = estimatedSizeBytes;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(BigtableSource.class)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .add("range", range)
-          .add("estimatedSizeBytes", estimatedSizeBytes)
-          .toString();
-    }
-
-    ////// Private state and internal implementation details //////
-    private final BigtableService service;
-    @Nullable private final String tableId;
-    @Nullable private final RowFilter filter;
-    private final ByteKeyRange range;
-    @Nullable private Long estimatedSizeBytes;
-    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
-    protected BigtableSource withStartKey(ByteKey startKey) {
-      checkNotNull(startKey, "startKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEndKey(ByteKey endKey) {
-      checkNotNull(endKey, "endKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
-      checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
-      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
-    }
-
-    /**
-     * Makes an API call to the Cloud Bigtable service that gives information about tablet key
-     * boundaries and estimated sizes. We can use these samples to ensure that splits are on
-     * different tablets, and possibly generate sub-splits within tablets.
-     */
-    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
-      return service.getSampleRowKeys(this);
-    }
-
-    @Override
-    public List<BigtableSource> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      // Update the desiredBundleSizeBytes in order to limit the
-      // number of splits to maximumNumberOfSplits.
-      long maximumNumberOfSplits = 4000;
-      long sizeEstimate = getEstimatedSizeBytes(options);
-      desiredBundleSizeBytes =
-          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
-      // Delegate to testable helper.
-      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
-    }
-
-    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
-    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
-        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
-      // There are no regions, or no samples available. Just scan the entire range.
-      if (sampleRowKeys.isEmpty()) {
-        logger.info("Not splitting source {} because no sample row keys are available.", this);
-        return Collections.singletonList(this);
-      }
-
-      logger.info(
-          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
-          desiredBundleSizeBytes,
-          sampleRowKeys.size(),
-          sampleRowKeys.get(0));
-
-      // Loop through all sampled responses and generate splits from the ones that overlap the
-      // scan range. The main complication is that we must track the end range of the previous
-      // sample to generate good ranges.
-      ByteKey lastEndKey = ByteKey.EMPTY;
-      long lastOffset = 0;
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      for (SampleRowKeysResponse response : sampleRowKeys) {
-        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
-        long responseOffset = response.getOffsetBytes();
-        checkState(
-            responseOffset >= lastOffset,
-            "Expected response byte offset %s to come after the last offset %s",
-            responseOffset,
-            lastOffset);
-
-        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
-          // This region does not overlap the scan, so skip it.
-          lastOffset = responseOffset;
-          lastEndKey = responseEndKey;
-          continue;
-        }
-
-        // Calculate the beginning of the split as the larger of startKey and the end of the last
-        // split. Unspecified start is smallest key so is correctly treated as earliest key.
-        ByteKey splitStartKey = lastEndKey;
-        if (splitStartKey.compareTo(range.getStartKey()) < 0) {
-          splitStartKey = range.getStartKey();
-        }
-
-        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
-        // that range.containsKey handles the case when range.getEndKey() is empty.
-        ByteKey splitEndKey = responseEndKey;
-        if (!range.containsKey(splitEndKey)) {
-          splitEndKey = range.getEndKey();
-        }
-
-        // We know this region overlaps the desired key range, and we know a rough estimate of its
-        // size. Split the key range into bundle-sized chunks and then add them all as splits.
-        long sampleSizeBytes = responseOffset - lastOffset;
-        List<BigtableSource> subSplits =
-            splitKeyRangeIntoBundleSizedSubranges(
-                sampleSizeBytes,
-                desiredBundleSizeBytes,
-                ByteKeyRange.of(splitStartKey, splitEndKey));
-        splits.addAll(subSplits);
-
-        // Move to the next region.
-        lastEndKey = responseEndKey;
-        lastOffset = responseOffset;
-      }
-
-      // We must add one more region after the end of the samples if both these conditions hold:
-      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
-      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
-      if (!lastEndKey.isEmpty()
-          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
-        splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
-      }
-
-      List<BigtableSource> ret = splits.build();
-      logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
-      return ret;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
-      // Delegate to testable helper.
-      if (estimatedSizeBytes == null) {
-        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Computes the estimated size in bytes based on the total size of all samples that overlap
-     * the key range this source will scan.
-     */
-    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
-      long estimatedSizeBytes = 0;
-      long lastOffset = 0;
-      ByteKey currentStartKey = ByteKey.EMPTY;
-      // Compute the total estimated size as the size of each sample that overlaps the scan range.
-      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
-      // filter or to sample on a given key range.
-      for (SampleRowKeysResponse response : samples) {
-        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
-        long currentOffset = response.getOffsetBytes();
-        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
-          // Skip an empty region.
-          lastOffset = currentOffset;
-          continue;
-        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
-          estimatedSizeBytes += currentOffset - lastOffset;
-        }
-        currentStartKey = currentEndKey;
-        lastOffset = currentOffset;
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Cloud Bigtable returns query results ordered by key.
-     */
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return true;
-    }
-
-    @Override
-    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
-      return new BigtableReader(this, service);
-    }
-
-    @Override
-    public void validate() {
-      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
-    }
-
-    @Override
-    public Coder<Row> getDefaultOutputCoder() {
-      return Proto2Coder.of(Row.class);
-    }
-
-    /** Helper that splits the specified range in this source into bundles. */
-    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
-        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
-      // Catch the trivial cases. Split is small enough already, or this is the last region.
-      logger.debug(
-          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
-          sampleSizeBytes,
-          desiredBundleSizeBytes);
-      if (sampleSizeBytes <= desiredBundleSizeBytes) {
-        return Collections.singletonList(
-            this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
-      }
-
-      checkArgument(
-          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
-      checkArgument(
-          desiredBundleSizeBytes > 0,
-          "Desired bundle size %s bytes must be greater than 0.",
-          desiredBundleSizeBytes);
-
-      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
-      List<ByteKey> splitKeys = range.split(splitCount);
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      Iterator<ByteKey> keys = splitKeys.iterator();
-      ByteKey prev = keys.next();
-      while (keys.hasNext()) {
-        ByteKey next = keys.next();
-        splits.add(
-            this
-                .withStartKey(prev)
-                .withEndKey(next)
-                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
-        prev = next;
-      }
-      return splits.build();
-    }
-
-    public ByteKeyRange getRange() {
-      return range;
-    }
-
-    public RowFilter getRowFilter() {
-      return filter;
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-  }
-
-  private static class BigtableReader extends BoundedReader<Row> {
-    // Thread-safety: source is protected via synchronization and is only accessed or modified
-    // inside a synchronized block (or constructor, which is the same).
-    private BigtableSource source;
-    private BigtableService service;
-    private BigtableService.Reader reader;
-    private final ByteKeyRangeTracker rangeTracker;
-    private long recordsReturned;
-
-    public BigtableReader(BigtableSource source, BigtableService service) {
-      this.source = source;
-      this.service = service;
-      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      reader = service.createReader(getCurrentSource());
-      boolean hasRecord =
-          reader.start()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public synchronized BigtableSource getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      boolean hasRecord =
-          reader.advance()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public Row getCurrent() throws NoSuchElementException {
-      return reader.getCurrentRow();
-    }
-
-    @Override
-    public void close() throws IOException {
-      logger.info("Closing reader after reading {} records.", recordsReturned);
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-
-    @Override
-    public final Double getFractionConsumed() {
-      return rangeTracker.getFractionConsumed();
-    }
-
-    @Override
-    public final synchronized BigtableSource splitAtFraction(double fraction) {
-      ByteKey splitKey;
-      try {
-        splitKey = source.getRange().interpolateKey(fraction);
-      } catch (IllegalArgumentException e) {
-        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
-        return null;
-      }
-      logger.debug(
-          "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
-      if (!rangeTracker.trySplitAtPosition(splitKey)) {
-        return null;
-      }
-      BigtableSource primary = source.withEndKey(splitKey);
-      BigtableSource residual = source.withStartKey(splitKey);
-      this.source = primary;
-      return residual;
-    }
-  }
-
-  private static class Sink
-      extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
-    public Sink(String tableId, BigtableService bigtableService) {
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-
-    public BigtableService getBigtableService() {
-      return bigtableService;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Sink.class)
-          .add("bigtableService", bigtableService)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////
-    private final String tableId;
-    private final BigtableService bigtableService;
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
-        PipelineOptions options) {
-      return new BigtableWriteOperation(this);
-    }
-
-    /** Does nothing, as it is redundant with {@link Write#validate}. */
-    @Override
-    public void validate(PipelineOptions options) {}
-  }
-
-  private static class BigtableWriteOperation
-      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final Sink sink;
-
-    public BigtableWriteOperation(Sink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
-        throws Exception {
-      return new BigtableWriter(this);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) {}
-
-    @Override
-    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
-      long count = 0;
-      for (Long value : writerResults) {
-        value += count;
-      }
-      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
-    }
-
-    @Override
-    public Sink getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<Long> getWriterResultCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final BigtableWriteOperation writeOperation;
-    private final Sink sink;
-    private BigtableService.Writer bigtableWriter;
-    private long recordsWritten;
-    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
-    public BigtableWriter(BigtableWriteOperation writeOperation) {
-      this.writeOperation = writeOperation;
-      this.sink = writeOperation.getSink();
-      this.failures = new ConcurrentLinkedQueue<>();
-    }
-
-    @Override
-    public void open(String uId) throws Exception {
-      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
-      recordsWritten = 0;
-    }
-
-    /**
-     * If any write has asynchronously failed, fail the bundle with a useful error.
-     */
-    private void checkForFailures() throws IOException {
-      // Note that this function is never called by multiple threads and is the only place that
-      // we remove from failures, so this code is safe.
-      if (failures.isEmpty()) {
-        return;
-      }
-
-      StringBuilder logEntry = new StringBuilder();
-      int i = 0;
-      for (; i < 10 && !failures.isEmpty(); ++i) {
-        BigtableWriteException exc = failures.remove();
-        logEntry.append("\n").append(exc.getMessage());
-        if (exc.getCause() != null) {
-          logEntry.append(": ").append(exc.getCause().getMessage());
-        }
-      }
-      String message =
-          String.format(
-              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
-              i + failures.size(),
-              i,
-              logEntry.toString());
-      logger.error(message);
-      throw new IOException(message);
-    }
-
-    @Override
-    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
-      checkForFailures();
-      Futures.addCallback(
-          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
-      ++recordsWritten;
-    }
-
-    @Override
-    public Long close() throws Exception {
-      bigtableWriter.close();
-      bigtableWriter = null;
-      checkForFailures();
-      logger.info("Wrote {} records", recordsWritten);
-      return recordsWritten;
-    }
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
-      return writeOperation;
-    }
-
-    private class WriteExceptionCallback implements FutureCallback<Empty> {
-      private final KV<ByteString, Iterable<Mutation>> value;
-
-      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
-        this.value = value;
-      }
-
-      @Override
-      public void onFailure(Throwable cause) {
-        failures.add(new BigtableWriteException(value, cause));
-      }
-
-      @Override
-      public void onSuccess(Empty produced) {}
-    }
-  }
-
-  /**
-   * An exception that puts information about the failed record being written in its message.
-   */
-  static class BigtableWriteException extends IOException {
-    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
-      super(
-          String.format(
-              "Error mutating row %s with mutations %s",
-              record.getKey().toStringUtf8(),
-              record.getValue()),
-          cause);
-    }
-  }
-
-  /**
-   * A helper function to produce a Cloud Bigtable user agent string.
-   */
-  private static String getUserAgent() {
-    String javaVersion = System.getProperty("java.specification.version");
-    DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo();
-    return String.format(
-        "%s/%s (%s); %s",
-        info.getName(),
-        info.getVersion(),
-        javaVersion,
-        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index 85d706c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
-  /**
-   * The interface of a class that can write to Cloud Bigtable.
-   */
-  interface Writer {
-    /**
-     * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
-     * row key to be mutated and the iterable of mutations represent the changes to be made to the
-     * row.
-     *
-     * @throws IOException if there is an error submitting the write.
-     */
-    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException;
-
-    /**
-     * Closes the writer.
-     *
-     * @throws IOException if any writes did not succeed
-     */
-    void close() throws IOException;
-  }
-
-  /**
-   * The interface of a class that reads from Cloud Bigtable.
-   */
-  interface Reader {
-    /**
-     * Reads the first element (including initialization, such as opening a network connection) and
-     * returns true if an element was found.
-     */
-    boolean start() throws IOException;
-
-    /**
-     * Attempts to read the next element, and returns true if an element has been read.
-     */
-    boolean advance() throws IOException;
-
-    /**
-     * Closes the reader.
-     *
-     * @throws IOException if there is an error.
-     */
-    void close() throws IOException;
-
-    /**
-     * Returns the last row read by a successful start() or advance(), or throws if there is no
-     * current row because the last such call was unsuccessful.
-     */
-    Row getCurrentRow() throws NoSuchElementException;
-  }
-
-  /**
-   * Returns {@code true} if the table with the give name exists.
-   */
-  boolean tableExists(String tableId) throws IOException;
-
-  /**
-   * Returns a {@link Reader} that will read from the specified source.
-   */
-  Reader createReader(BigtableSource source) throws IOException;
-
-  /**
-   * Returns a {@link Writer} that will write to the specified table.
-   */
-  Writer openForWriting(String tableId) throws IOException;
-
-  /**
-   * Returns a set of row keys sampled from the underlying table. These contain information about
-   * the distribution of keys within the table.
-   */
-  List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}