You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:47:59 UTC

[12/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
deleted file mode 100644
index eae5e8b..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.common.base.Preconditions;
-
-import org.codehaus.stax2.XMLInputFactory2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.ValidationEvent;
-import javax.xml.bind.ValidationEventHandler;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@code PCollection} of a given type. An Dataflow read transform can be
- * created by passing an {@code XmlSource} object to {@code Read.from()}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Dataflow pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- *     .withRootElement("root")
- *     .withRecordElement("record")
- *     .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- *   <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- *   <li>Include a compatible implementation on the classpath at run-time,
- *       such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in Maven sdk/pom.xml file of
- * Google Cloud Dataflow.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- *        {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
-public class XmlSource<T> extends FileBasedSource<T> {
-
-  private static final String XML_VERSION = "1.1";
-  private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
-  private final String rootElement;
-  private final String recordElement;
-  private final Class<T> recordClass;
-
-  /**
-   * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
-   * pattern. Each XML file should be of the form defined in {@link XmlSource}.
-   */
-  public static <T> XmlSource<T> from(String fileOrPatternSpec) {
-    return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
-  }
-
-  /**
-   * Sets name of the root element of the XML document. This will be used to create a valid starting
-   * root element when initiating a bundle of records created from an XML document. This is a
-   * required parameter.
-   */
-  public XmlSource<T> withRootElement(String rootElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets name of the record element of the XML document. This will be used to determine offset of
-   * the first record of a bundle created from the XML document. This is a required parameter.
-   */
-  public XmlSource<T> withRecordElement(String recordElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
-   * will be used when unmarshalling record objects from the XML file.  This is a required
-   * parameter.
-   */
-  public XmlSource<T> withRecordClass(Class<T> recordClass) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
-
-  /**
-   * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
-   * to {@link OffsetBasedSource} for the definition of minBundleSize.  This is an optional
-   * parameter.
-   */
-  public XmlSource<T> withMinBundleSize(long minBundleSize) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
-      String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
-      String rootElement, String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize, startOffset, endOffset);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
-  }
-
-  @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new XmlSource<T>(
-        fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
-  }
-
-  @Override
-  protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-    return new XMLReader<T>(this);
-  }
-
-  @Override
-  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-    return false;
-  }
-
-  @Override
-  public void validate() {
-    super.validate();
-    Preconditions.checkNotNull(
-        rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
-    Preconditions.checkNotNull(
-        recordElement,
-        "recordElement is null. Use builder method withRecordElement() to set this.");
-    Preconditions.checkNotNull(
-        recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return JAXBCoder.of(recordClass);
-  }
-
-  public String getRootElement() {
-    return rootElement;
-  }
-
-  public String getRecordElement() {
-    return recordElement;
-  }
-
-  public Class<T> getRecordClass() {
-    return recordClass;
-  }
-
-  /**
-   * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
-   * file should be of the form defined at {@link XmlSource}.
-   *
-   * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp
-   * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-   *
-   * @param <T> Type of objects that will be read by the reader.
-   */
-  private static class XMLReader<T> extends FileBasedReader<T> {
-    // The amount of bytes read from the channel to memory when determining the starting offset of
-    // the first record in a bundle. After matching to starting offset of the first record the
-    // remaining bytes read to this buffer and the bytes still not read from the channel are used to
-    // create the XML parser.
-    private static final int BUF_SIZE = 1024;
-
-    // This should be the maximum number of bytes a character will encode to, for any encoding
-    // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be
-    // four bytes.
-    private static final int MAX_CHAR_BYTES = 4;
-
-    // In order to support reading starting in the middle of an XML file, we construct an imaginary
-    // well-formed document (a header and root tag followed by the contents of the input starting at
-    // the record boundary) and feed it to the parser. Because of this, the offset reported by the
-    // XML parser is not the same as offset in the original file. They differ by a constant amount:
-    // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset;
-    // Note that this is true only for files with single-byte characters.
-    // It appears that, as of writing, there does not exist a Java XML parser capable of correctly
-    // reporting byte offsets of elements in the presence of multi-byte characters.
-    private long parserBaseOffset = 0;
-    private boolean readingStarted = false;
-
-    // If true, the current bundle does not contain any records.
-    private boolean emptyBundle = false;
-
-    private Unmarshaller jaxbUnmarshaller = null;
-    private XMLStreamReader parser = null;
-
-    private T currentRecord = null;
-
-    // Byte offset of the current record in the XML file provided when creating the source.
-    private long currentByteOffset = 0;
-
-    public XMLReader(XmlSource<T> source) {
-      super(source);
-
-      // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
-      try {
-        JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
-        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
-        // Throw errors if validation fails. JAXB by default ignores validation errors.
-        jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
-          @Override
-          public boolean handleEvent(ValidationEvent event) {
-            throw new RuntimeException(event.getMessage(), event.getLinkedException());
-          }
-        });
-      } catch (JAXBException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public synchronized XmlSource<T> getCurrentSource() {
-      return (XmlSource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected void startReading(ReadableByteChannel channel) throws IOException {
-      // This method determines the correct starting offset of the first record by reading bytes
-      // from the ReadableByteChannel. This implementation does not need the channel to be a
-      // SeekableByteChannel.
-      // The method tries to determine the first record element in the byte channel. The first
-      // record must start with the characters "<recordElement" where "recordElement" is the
-      // record element of the XML document described above. For the match to be complete this
-      // has to be followed by one of following.
-      // * any whitespace character
-      // * '>' character
-      // * '/' character (to support empty records).
-      //
-      // After this match this method creates the XML parser for parsing the XML document,
-      // feeding it a fake document consisting of an XML header and the <rootElement> tag followed
-      // by the contents of channel starting from <recordElement. The <rootElement> tag may be never
-      // closed.
-
-      // This stores any bytes that should be used prior to the remaining bytes of the channel when
-      // creating an XML parser object.
-      ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
-      // A dummy declaration and root for the document with proper XML version and encoding. Without
-      // this XML parsing may fail or may produce incorrect results.
-
-      byte[] dummyStartDocumentBytes =
-          ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
-              + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
-      preambleByteBuffer.write(dummyStartDocumentBytes);
-      // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
-      // method returns the offset and stores any bytes that should be used when creating the XML
-      // parser in preambleByteBuffer.
-      long offsetInFileOfRecordElement =
-          getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
-      if (offsetInFileOfRecordElement < 0) {
-        // Bundle has no records. So marking this bundle as an empty bundle.
-        emptyBundle = true;
-        return;
-      } else {
-        byte[] preambleBytes = preambleByteBuffer.toByteArray();
-        currentByteOffset = offsetInFileOfRecordElement;
-        setUpXMLParser(channel, preambleBytes);
-        parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length;
-      }
-      readingStarted = true;
-    }
-
-    // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts
-    // any bytes read past the starting offset of the next record back to the preambleByteBuffer.
-    // If a record is found, returns the starting offset of the record, otherwise
-    // returns -1.
-    private long getFirstOccurenceOfRecordElement(
-        ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
-      int byteIndexInRecordElementToMatch = 0;
-      // Index of the byte in the string "<recordElement" to be matched
-      // against the current byte from the stream.
-      boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
-      // next character to confirm if this is a positive match.
-      boolean fullyMatched = false; // If true, record element was fully matched.
-
-      // This gives the offset of the byte currently being read. We do a '-1' here since we
-      // increment this value at the beginning of the while loop below.
-      long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
-      long startingOffsetInFileOfCurrentMatch = -1;
-      // If this is non-negative, currently there is a match in progress and this value gives the
-      // starting offset of the match currently being conducted.
-      boolean matchStarted = false; // If true, a match is currently in progress.
-
-      // These two values are used to determine the character immediately following a match for
-      // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
-      byte[] charBytes = new byte[MAX_CHAR_BYTES];
-      int charBytesFound = 0;
-
-      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
-      byte[] recordStartBytes =
-          ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
-
-      outer: while (channel.read(buf) > 0) {
-        buf.flip();
-        while (buf.hasRemaining()) {
-          offsetInFileOfCurrentByte++;
-          byte b = buf.get();
-          boolean reset = false;
-          if (recordStartBytesMatched) {
-            // We already matched "<recordElement" reading the next character to determine if this
-            // is a positive match for a new record.
-            charBytes[charBytesFound] = b;
-            charBytesFound++;
-            Character c = null;
-            if (charBytesFound == charBytes.length) {
-              CharBuffer charBuf = CharBuffer.allocate(1);
-              InputStream charBufStream = new ByteArrayInputStream(charBytes);
-              java.io.Reader reader =
-                  new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
-              int read = reader.read();
-              if (read <= 0) {
-                return -1;
-              }
-              charBuf.flip();
-              c = (char) read;
-            } else {
-              continue;
-            }
-
-            // Record start may be of following forms
-            // * "<recordElement<whitespace>..."
-            // * "<recordElement>..."
-            // * "<recordElement/..."
-            if (Character.isWhitespace(c) || c == '>' || c == '/') {
-              fullyMatched = true;
-              // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
-              // already read from the channel.
-              preambleByteBuffer.write(recordStartBytes);
-              preambleByteBuffer.write(charBytes);
-              // Also add the rest of the current buffer to preambleByteBuffer.
-              while (buf.hasRemaining()) {
-                preambleByteBuffer.write(buf.get());
-              }
-              break outer;
-            } else {
-              // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
-              ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
-              newbuf.put(charBytes);
-              offsetInFileOfCurrentByte -= charBytes.length;
-              while (buf.hasRemaining()) {
-                newbuf.put(buf.get());
-              }
-              newbuf.flip();
-              buf = newbuf;
-
-              // Ignore everything and try again starting from the current buffer.
-              reset = true;
-            }
-          } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
-            // Next byte matched.
-            if (!matchStarted) {
-              // Match was for the first byte, record the starting offset.
-              matchStarted = true;
-              startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
-            }
-            byteIndexInRecordElementToMatch++;
-          } else {
-            // Not a match. Ignore everything and try again starting at current point.
-            reset = true;
-          }
-          if (reset) {
-            // Clear variables and try to match starting from the next byte.
-            byteIndexInRecordElementToMatch = 0;
-            startingOffsetInFileOfCurrentMatch = -1;
-            matchStarted = false;
-            recordStartBytesMatched = false;
-            charBytes = new byte[MAX_CHAR_BYTES];
-            charBytesFound = 0;
-          }
-          if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
-            // "<recordElement" matched. Need to still check next byte since this might be an
-            // element that has "recordElement" as a prefix.
-            recordStartBytesMatched = true;
-          }
-        }
-        buf.clear();
-      }
-
-      if (!fullyMatched) {
-        return -1;
-      } else {
-        return startingOffsetInFileOfCurrentMatch;
-      }
-    }
-
-    private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException {
-      try {
-        // We use Woodstox because the StAX implementation provided by OpenJDK reports
-        // character locations incorrectly. Note that Woodstox still currently reports *byte*
-        // locations incorrectly when parsing documents that contain multi-byte characters.
-        XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance();
-        this.parser = xmlInputFactory.createXMLStreamReader(
-            new SequenceInputStream(
-                new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
-            "UTF-8");
-
-        // Current offset should be the offset before reading the record element.
-        while (true) {
-          int event = parser.next();
-          if (event == XMLStreamConstants.START_ELEMENT) {
-            String localName = parser.getLocalName();
-            if (localName.equals(getCurrentSource().recordElement)) {
-              break;
-            }
-          }
-        }
-      } catch (FactoryConfigurationError | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    protected boolean readNextRecord() throws IOException {
-      if (emptyBundle) {
-        currentByteOffset = Long.MAX_VALUE;
-        return false;
-      }
-      try {
-        // Update current offset and check if the next value is the record element.
-        currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-        while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
-          parser.next();
-          currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-          if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
-            currentByteOffset = Long.MAX_VALUE;
-            return false;
-          }
-        }
-        JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
-        currentRecord = jb.getValue();
-        return true;
-      } catch (JAXBException | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (!readingStarted) {
-        throw new NoSuchElementException();
-      }
-      return currentRecord;
-    }
-
-    @Override
-    protected boolean isAtSplitPoint() {
-      // Every record is at a split point.
-      return true;
-    }
-
-    @Override
-    protected long getCurrentOffset() {
-      return currentByteOffset;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 0ce2a5e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,989 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.io.range.ByteKey;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table")
- *         .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- *     BigtableIO.write()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
-   * initialized with a
-   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
-   * specifies which table to read. A {@link RowFilter} may also optionally be specified using
-   * {@link BigtableIO.Read#withRowFilter}.
-   */
-  @Experimental
-  public static Read read() {
-    return new Read(null, "", null, null);
-  }
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
-   * initialized with a
-   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
-   * specifies which table to write.
-   */
-  @Experimental
-  public static Write write() {
-    return new Write(null, "", null);
-  }
-
-  /**
-   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Read extends PTransform<PBegin, PCollection<Row>> {
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Read}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Read(optionsWithAgent, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
-     * using the given row filter.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withRowFilter(RowFilter filter) {
-      checkNotNull(filter, "filter");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being read from.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PCollection<Row> apply(PBegin input) {
-      BigtableSource source =
-          new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
-      return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Read.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .toString();
-    }
-
-    /////////////////////////////////////////////////////////////////////////////////////////
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final RowFilter filter;
-    @Nullable private final BigtableService bigtableService;
-
-    private Read(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable RowFilter filter,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.filter = filter;
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Read withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Write
-      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final BigtableService bigtableService;
-
-    private Write(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Write}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Write(optionsWithAgent, tableId, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being written to.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      Sink sink = new Sink(tableId, getBigtableService());
-      return input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink));
-    }
-
-    @Override
-    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Write withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Write.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////
-  /** Disallow construction of utility class. */
-  private BigtableIO() {}
-
-  static class BigtableSource extends BoundedSource<Row> {
-    public BigtableSource(
-        BigtableService service,
-        String tableId,
-        @Nullable RowFilter filter,
-        ByteKeyRange range,
-        Long estimatedSizeBytes) {
-      this.service = service;
-      this.tableId = tableId;
-      this.filter = filter;
-      this.range = range;
-      this.estimatedSizeBytes = estimatedSizeBytes;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(BigtableSource.class)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .add("range", range)
-          .add("estimatedSizeBytes", estimatedSizeBytes)
-          .toString();
-    }
-
-    ////// Private state and internal implementation details //////
-    private final BigtableService service;
-    @Nullable private final String tableId;
-    @Nullable private final RowFilter filter;
-    private final ByteKeyRange range;
-    @Nullable private Long estimatedSizeBytes;
-    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
-    protected BigtableSource withStartKey(ByteKey startKey) {
-      checkNotNull(startKey, "startKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEndKey(ByteKey endKey) {
-      checkNotNull(endKey, "endKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
-      checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
-      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
-    }
-
-    /**
-     * Makes an API call to the Cloud Bigtable service that gives information about tablet key
-     * boundaries and estimated sizes. We can use these samples to ensure that splits are on
-     * different tablets, and possibly generate sub-splits within tablets.
-     */
-    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
-      return service.getSampleRowKeys(this);
-    }
-
-    @Override
-    public List<BigtableSource> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      // Update the desiredBundleSizeBytes in order to limit the
-      // number of splits to maximumNumberOfSplits.
-      long maximumNumberOfSplits = 4000;
-      long sizeEstimate = getEstimatedSizeBytes(options);
-      desiredBundleSizeBytes =
-          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
-      // Delegate to testable helper.
-      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
-    }
-
-    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
-    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
-        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
-      // There are no regions, or no samples available. Just scan the entire range.
-      if (sampleRowKeys.isEmpty()) {
-        logger.info("Not splitting source {} because no sample row keys are available.", this);
-        return Collections.singletonList(this);
-      }
-
-      logger.info(
-          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
-          desiredBundleSizeBytes,
-          sampleRowKeys.size(),
-          sampleRowKeys.get(0));
-
-      // Loop through all sampled responses and generate splits from the ones that overlap the
-      // scan range. The main complication is that we must track the end range of the previous
-      // sample to generate good ranges.
-      ByteKey lastEndKey = ByteKey.EMPTY;
-      long lastOffset = 0;
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      for (SampleRowKeysResponse response : sampleRowKeys) {
-        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
-        long responseOffset = response.getOffsetBytes();
-        checkState(
-            responseOffset >= lastOffset,
-            "Expected response byte offset %s to come after the last offset %s",
-            responseOffset,
-            lastOffset);
-
-        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
-          // This region does not overlap the scan, so skip it.
-          lastOffset = responseOffset;
-          lastEndKey = responseEndKey;
-          continue;
-        }
-
-        // Calculate the beginning of the split as the larger of startKey and the end of the last
-        // split. Unspecified start is smallest key so is correctly treated as earliest key.
-        ByteKey splitStartKey = lastEndKey;
-        if (splitStartKey.compareTo(range.getStartKey()) < 0) {
-          splitStartKey = range.getStartKey();
-        }
-
-        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
-        // that range.containsKey handles the case when range.getEndKey() is empty.
-        ByteKey splitEndKey = responseEndKey;
-        if (!range.containsKey(splitEndKey)) {
-          splitEndKey = range.getEndKey();
-        }
-
-        // We know this region overlaps the desired key range, and we know a rough estimate of its
-        // size. Split the key range into bundle-sized chunks and then add them all as splits.
-        long sampleSizeBytes = responseOffset - lastOffset;
-        List<BigtableSource> subSplits =
-            splitKeyRangeIntoBundleSizedSubranges(
-                sampleSizeBytes,
-                desiredBundleSizeBytes,
-                ByteKeyRange.of(splitStartKey, splitEndKey));
-        splits.addAll(subSplits);
-
-        // Move to the next region.
-        lastEndKey = responseEndKey;
-        lastOffset = responseOffset;
-      }
-
-      // We must add one more region after the end of the samples if both these conditions hold:
-      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
-      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
-      if (!lastEndKey.isEmpty()
-          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
-        splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
-      }
-
-      List<BigtableSource> ret = splits.build();
-      logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
-      return ret;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
-      // Delegate to testable helper.
-      if (estimatedSizeBytes == null) {
-        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Computes the estimated size in bytes based on the total size of all samples that overlap
-     * the key range this source will scan.
-     */
-    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
-      long estimatedSizeBytes = 0;
-      long lastOffset = 0;
-      ByteKey currentStartKey = ByteKey.EMPTY;
-      // Compute the total estimated size as the size of each sample that overlaps the scan range.
-      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
-      // filter or to sample on a given key range.
-      for (SampleRowKeysResponse response : samples) {
-        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
-        long currentOffset = response.getOffsetBytes();
-        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
-          // Skip an empty region.
-          lastOffset = currentOffset;
-          continue;
-        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
-          estimatedSizeBytes += currentOffset - lastOffset;
-        }
-        currentStartKey = currentEndKey;
-        lastOffset = currentOffset;
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Cloud Bigtable returns query results ordered by key.
-     */
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return true;
-    }
-
-    @Override
-    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
-      return new BigtableReader(this, service);
-    }
-
-    @Override
-    public void validate() {
-      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
-    }
-
-    @Override
-    public Coder<Row> getDefaultOutputCoder() {
-      return Proto2Coder.of(Row.class);
-    }
-
-    /** Helper that splits the specified range in this source into bundles. */
-    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
-        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
-      // Catch the trivial cases. Split is small enough already, or this is the last region.
-      logger.debug(
-          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
-          sampleSizeBytes,
-          desiredBundleSizeBytes);
-      if (sampleSizeBytes <= desiredBundleSizeBytes) {
-        return Collections.singletonList(
-            this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
-      }
-
-      checkArgument(
-          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
-      checkArgument(
-          desiredBundleSizeBytes > 0,
-          "Desired bundle size %s bytes must be greater than 0.",
-          desiredBundleSizeBytes);
-
-      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
-      List<ByteKey> splitKeys = range.split(splitCount);
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      Iterator<ByteKey> keys = splitKeys.iterator();
-      ByteKey prev = keys.next();
-      while (keys.hasNext()) {
-        ByteKey next = keys.next();
-        splits.add(
-            this
-                .withStartKey(prev)
-                .withEndKey(next)
-                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
-        prev = next;
-      }
-      return splits.build();
-    }
-
-    public ByteKeyRange getRange() {
-      return range;
-    }
-
-    public RowFilter getRowFilter() {
-      return filter;
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-  }
-
-  private static class BigtableReader extends BoundedReader<Row> {
-    // Thread-safety: source is protected via synchronization and is only accessed or modified
-    // inside a synchronized block (or constructor, which is the same).
-    private BigtableSource source;
-    private BigtableService service;
-    private BigtableService.Reader reader;
-    private final ByteKeyRangeTracker rangeTracker;
-    private long recordsReturned;
-
-    public BigtableReader(BigtableSource source, BigtableService service) {
-      this.source = source;
-      this.service = service;
-      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      reader = service.createReader(getCurrentSource());
-      boolean hasRecord =
-          reader.start()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public synchronized BigtableSource getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      boolean hasRecord =
-          reader.advance()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public Row getCurrent() throws NoSuchElementException {
-      return reader.getCurrentRow();
-    }
-
-    @Override
-    public void close() throws IOException {
-      logger.info("Closing reader after reading {} records.", recordsReturned);
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-
-    @Override
-    public final Double getFractionConsumed() {
-      return rangeTracker.getFractionConsumed();
-    }
-
-    @Override
-    public final synchronized BigtableSource splitAtFraction(double fraction) {
-      ByteKey splitKey;
-      try {
-        splitKey = source.getRange().interpolateKey(fraction);
-      } catch (IllegalArgumentException e) {
-        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
-        return null;
-      }
-      logger.debug(
-          "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
-      if (!rangeTracker.trySplitAtPosition(splitKey)) {
-        return null;
-      }
-      BigtableSource primary = source.withEndKey(splitKey);
-      BigtableSource residual = source.withStartKey(splitKey);
-      this.source = primary;
-      return residual;
-    }
-  }
-
-  private static class Sink
-      extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
-    public Sink(String tableId, BigtableService bigtableService) {
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-
-    public BigtableService getBigtableService() {
-      return bigtableService;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Sink.class)
-          .add("bigtableService", bigtableService)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////
-    private final String tableId;
-    private final BigtableService bigtableService;
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
-        PipelineOptions options) {
-      return new BigtableWriteOperation(this);
-    }
-
-    /** Does nothing, as it is redundant with {@link Write#validate}. */
-    @Override
-    public void validate(PipelineOptions options) {}
-  }
-
-  private static class BigtableWriteOperation
-      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final Sink sink;
-
-    public BigtableWriteOperation(Sink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
-        throws Exception {
-      return new BigtableWriter(this);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) {}
-
-    @Override
-    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
-      long count = 0;
-      for (Long value : writerResults) {
-        value += count;
-      }
-      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
-    }
-
-    @Override
-    public Sink getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<Long> getWriterResultCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final BigtableWriteOperation writeOperation;
-    private final Sink sink;
-    private BigtableService.Writer bigtableWriter;
-    private long recordsWritten;
-    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
-    public BigtableWriter(BigtableWriteOperation writeOperation) {
-      this.writeOperation = writeOperation;
-      this.sink = writeOperation.getSink();
-      this.failures = new ConcurrentLinkedQueue<>();
-    }
-
-    @Override
-    public void open(String uId) throws Exception {
-      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
-      recordsWritten = 0;
-    }
-
-    /**
-     * If any write has asynchronously failed, fail the bundle with a useful error.
-     */
-    private void checkForFailures() throws IOException {
-      // Note that this function is never called by multiple threads and is the only place that
-      // we remove from failures, so this code is safe.
-      if (failures.isEmpty()) {
-        return;
-      }
-
-      StringBuilder logEntry = new StringBuilder();
-      int i = 0;
-      for (; i < 10 && !failures.isEmpty(); ++i) {
-        BigtableWriteException exc = failures.remove();
-        logEntry.append("\n").append(exc.getMessage());
-        if (exc.getCause() != null) {
-          logEntry.append(": ").append(exc.getCause().getMessage());
-        }
-      }
-      String message =
-          String.format(
-              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
-              i + failures.size(),
-              i,
-              logEntry.toString());
-      logger.error(message);
-      throw new IOException(message);
-    }
-
-    @Override
-    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
-      checkForFailures();
-      Futures.addCallback(
-          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
-      ++recordsWritten;
-    }
-
-    @Override
-    public Long close() throws Exception {
-      bigtableWriter.close();
-      bigtableWriter = null;
-      checkForFailures();
-      logger.info("Wrote {} records", recordsWritten);
-      return recordsWritten;
-    }
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
-      return writeOperation;
-    }
-
-    private class WriteExceptionCallback implements FutureCallback<Empty> {
-      private final KV<ByteString, Iterable<Mutation>> value;
-
-      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
-        this.value = value;
-      }
-
-      @Override
-      public void onFailure(Throwable cause) {
-        failures.add(new BigtableWriteException(value, cause));
-      }
-
-      @Override
-      public void onSuccess(Empty produced) {}
-    }
-  }
-
-  /**
-   * An exception that puts information about the failed record being written in its message.
-   */
-  static class BigtableWriteException extends IOException {
-    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
-      super(
-          String.format(
-              "Error mutating row %s with mutations %s",
-              record.getKey().toStringUtf8(),
-              record.getValue()),
-          cause);
-    }
-  }
-
-  /**
-   * A helper function to produce a Cloud Bigtable user agent string.
-   */
-  private static String getUserAgent() {
-    String javaVersion = System.getProperty("java.specification.version");
-    DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo();
-    return String.format(
-        "%s/%s (%s); %s",
-        info.getName(),
-        info.getVersion(),
-        javaVersion,
-        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index 0c47f65..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
-  /**
-   * The interface of a class that can write to Cloud Bigtable.
-   */
-  interface Writer {
-    /**
-     * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
-     * row key to be mutated and the iterable of mutations represent the changes to be made to the
-     * row.
-     *
-     * @throws IOException if there is an error submitting the write.
-     */
-    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException;
-
-    /**
-     * Closes the writer.
-     *
-     * @throws IOException if any writes did not succeed
-     */
-    void close() throws IOException;
-  }
-
-  /**
-   * The interface of a class that reads from Cloud Bigtable.
-   */
-  interface Reader {
-    /**
-     * Reads the first element (including initialization, such as opening a network connection) and
-     * returns true if an element was found.
-     */
-    boolean start() throws IOException;
-
-    /**
-     * Attempts to read the next element, and returns true if an element has been read.
-     */
-    boolean advance() throws IOException;
-
-    /**
-     * Closes the reader.
-     *
-     * @throws IOException if there is an error.
-     */
-    void close() throws IOException;
-
-    /**
-     * Returns the last row read by a successful start() or advance(), or throws if there is no
-     * current row because the last such call was unsuccessful.
-     */
-    Row getCurrentRow() throws NoSuchElementException;
-  }
-
-  /**
-   * Returns {@code true} if the table with the give name exists.
-   */
-  boolean tableExists(String tableId) throws IOException;
-
-  /**
-   * Returns a {@link Reader} that will read from the specified source.
-   */
-  Reader createReader(BigtableSource source) throws IOException;
-
-  /**
-   * Returns a {@link Writer} that will write to the specified table.
-   */
-  Writer openForWriting(String tableId) throws IOException;
-
-  /**
-   * Returns a set of row keys sampled from the underlying table. These contain information about
-   * the distribution of keys within the table.
-   */
-  List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 9f32022..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
-  public BigtableServiceImpl(BigtableOptions options) {
-    this.options = options;
-  }
-
-  private final BigtableOptions options;
-
-  @Override
-  public BigtableWriterImpl openForWriting(String tableId) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    String tableName = options.getClusterName().toTableNameStr(tableId);
-    return new BigtableWriterImpl(session, tableName);
-  }
-
-  @Override
-  public boolean tableExists(String tableId) throws IOException {
-    if (!BigtableSession.isAlpnProviderEnabled()) {
-      logger.info(
-          "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
-              + " configured.",
-          tableId,
-          options);
-      return true;
-    }
-
-    try (BigtableSession session = new BigtableSession(options)) {
-      GetTableRequest getTable =
-          GetTableRequest.newBuilder()
-              .setName(options.getClusterName().toTableNameStr(tableId))
-              .build();
-      session.getTableAdminClient().getTable(getTable);
-      return true;
-    } catch (StatusRuntimeException e) {
-      if (e.getStatus().getCode() == Code.NOT_FOUND) {
-        return false;
-      }
-      String message =
-          String.format(
-              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
-      logger.error(message, e);
-      throw new IOException(message, e);
-    }
-  }
-
-  private class BigtableReaderImpl implements Reader {
-    private BigtableSession session;
-    private final BigtableSource source;
-    private ResultScanner<Row> results;
-    private Row currentRow;
-
-    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
-      this.session = session;
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      RowRange range =
-          RowRange.newBuilder()
-              .setStartKey(source.getRange().getStartKey().getValue())
-              .setEndKey(source.getRange().getEndKey().getValue())
-              .build();
-      ReadRowsRequest.Builder requestB =
-          ReadRowsRequest.newBuilder()
-              .setRowRange(range)
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
-      if (source.getRowFilter() != null) {
-        requestB.setFilter(source.getRowFilter());
-      }
-      results = session.getDataClient().readRows(requestB.build());
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      currentRow = results.next();
-      return (currentRow != null);
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Goal: by the end of this function, both results and session are null and closed,
-      // independent of what errors they throw or prior state.
-
-      if (session == null) {
-        // Only possible when previously closed, so we know that results is also null.
-        return;
-      }
-
-      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
-      // the Closer, but we can use the Closer to simplify the error handling.
-      try (Closer closer = Closer.create()) {
-        if (results != null) {
-          closer.register(results);
-          results = null;
-        }
-
-        session.close();
-      } finally {
-        session = null;
-      }
-    }
-
-    @Override
-    public Row getCurrentRow() throws NoSuchElementException {
-      if (currentRow == null) {
-        throw new NoSuchElementException();
-      }
-      return currentRow;
-    }
-  }
-
-  private static class BigtableWriterImpl implements Writer {
-    private BigtableSession session;
-    private AsyncExecutor executor;
-    private final MutateRowRequest.Builder partialBuilder;
-
-    public BigtableWriterImpl(BigtableSession session, String tableName) {
-      this.session = session;
-      this.executor =
-          new AsyncExecutor(
-              session.getDataClient(),
-              new HeapSizeManager(
-                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
-                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
-      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (executor != null) {
-          executor.flush();
-          executor = null;
-        }
-      } finally {
-        if (session != null) {
-          session.close();
-          session = null;
-        }
-      }
-    }
-
-    @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException {
-      MutateRowRequest r =
-          partialBuilder
-              .clone()
-              .setRowKey(record.getKey())
-              .addAllMutations(record.getValue())
-              .build();
-      try {
-        return executor.mutateRowAsync(r);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Write interrupted", e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects
-        .toStringHelper(BigtableServiceImpl.class)
-        .add("options", options)
-        .toString();
-  }
-
-  @Override
-  public Reader createReader(BigtableSource source) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    return new BigtableReaderImpl(session, source);
-  }
-
-  @Override
-  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
-    try (BigtableSession session = new BigtableSession(options)) {
-      SampleRowKeysRequest request =
-          SampleRowKeysRequest.newBuilder()
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
-              .build();
-      return session.getDataClient().sampleRowKeys(request);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
deleted file mode 100644
index 553f46c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
deleted file mode 100644
index 5f0050d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing common storage formats, including
- * {@link com.google.cloud.dataflow.sdk.io.AvroIO},
- * {@link com.google.cloud.dataflow.sdk.io.BigQueryIO}, and
- * {@link com.google.cloud.dataflow.sdk.io.TextIO}.
- *
- * <p>The classes in this package provide {@code Read} transforms that create PCollections
- * from existing storage:
- * <pre>{@code
- * PCollection<TableRow> inputData = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations");
- * }</pre>
- * and {@code Write} transforms that persist PCollections to external storage:
- * <pre> {@code
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers"));
- * } </pre>
- */
-package com.google.cloud.dataflow.sdk.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
deleted file mode 100644
index 5b9a003..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ByteString.ByteIterator;
-
-import java.io.Serializable;
-
-/**
- * A class representing a key consisting of an array of bytes. Arbitrary-length
- * {@code byte[]} keys are typical in key-value stores such as Google Cloud Bigtable.
- *
- * <p>Instances of {@link ByteKey} are immutable.
- *
- * <p>{@link ByteKey} implements {@link Comparable Comparable&lt;ByteKey&gt;} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the successor
- * to a key is the same key with an additional 0 byte appended; and keys have unbounded size.
- *
- * <p>Note that the empty {@link ByteKey} compares smaller than all other keys, but some systems
- * have the semantic that when an empty {@link ByteKey} is used as an upper bound, it represents
- * the largest possible key. In these cases, implementors should use {@link #isEmpty} to test
- * whether an upper bound key is empty.
- */
-public final class ByteKey implements Comparable<ByteKey>, Serializable {
-  /** An empty key. */
-  public static final ByteKey EMPTY = ByteKey.of();
-
-  /**
-   * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
-   */
-  public static ByteKey of(ByteString value) {
-    return new ByteKey(value);
-  }
-
-  /**
-   * Creates a new {@link ByteKey} backed by a copy of the specified {@code byte[]}.
-   *
-   * <p>Makes a copy of the underlying array.
-   */
-  public static ByteKey copyFrom(byte[] bytes) {
-    return of(ByteString.copyFrom(bytes));
-  }
-
-  /**
-   * Creates a new {@link ByteKey} backed by a copy of the specified {@code int[]}. This method is
-   * primarily used as a convenience to create a {@link ByteKey} in code without casting down to
-   * signed Java {@link Byte bytes}:
-   *
-   * <pre>{@code
-   * ByteKey key = ByteKey.of(0xde, 0xad, 0xbe, 0xef);
-   * }</pre>
-   *
-   * <p>Makes a copy of the input.
-   */
-  public static ByteKey of(int... bytes) {
-    byte[] ret = new byte[bytes.length];
-    for (int i = 0; i < bytes.length; ++i) {
-      ret[i] = (byte) (bytes[i] & 0xff);
-    }
-    return ByteKey.copyFrom(ret);
-  }
-
-  /**
-   * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
-   *
-   * <p>Does not copy.
-   */
-  public ByteString getValue() {
-    return value;
-  }
-
-  /**
-   * Returns a newly-allocated {@code byte[]} representing this {@link ByteKey}.
-   *
-   * <p>Copies the underlying {@code byte[]}.
-   */
-  public byte[] getBytes() {
-    return value.toByteArray();
-  }
-
-  /**
-   * Returns {@code true} if the {@code byte[]} backing this {@link ByteKey} is of length 0.
-   */
-  public boolean isEmpty() {
-    return value.isEmpty();
-  }
-
-  /**
-   * {@link ByteKey} implements {@link Comparable Comparable&lt;ByteKey&gt;} by comparing the
-   * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the
-   * successor to a key is the same key with an additional 0 byte appended; and keys have unbounded
-   * size.
-   */
-  @Override
-  public int compareTo(ByteKey other) {
-    checkNotNull(other, "other");
-    ByteIterator thisIt = value.iterator();
-    ByteIterator otherIt = other.value.iterator();
-    while (thisIt.hasNext() && otherIt.hasNext()) {
-      // (byte & 0xff) converts [-128,127] bytes to [0,255] ints.
-      int cmp = (thisIt.nextByte() & 0xff) - (otherIt.nextByte() & 0xff);
-      if (cmp != 0) {
-        return cmp;
-      }
-    }
-    // If we get here, the prefix of both arrays is equal up to the shorter array. The array with
-    // more bytes is larger.
-    return value.size() - other.value.size();
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  private final ByteString value;
-
-  private ByteKey(ByteString value) {
-    this.value = value;
-  }
-
-  /** Array used as a helper in {@link #toString}. */
-  private static final char[] HEX =
-      new char[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
-
-  // Prints the key as a string "[deadbeef]".
-  @Override
-  public String toString() {
-    char[] encoded = new char[2 * value.size() + 2];
-    encoded[0] = '[';
-    int cnt = 1;
-    ByteIterator iterator = value.iterator();
-    while (iterator.hasNext()) {
-      byte b = iterator.nextByte();
-      encoded[cnt] = HEX[(b & 0xF0) >>> 4];
-      ++cnt;
-      encoded[cnt] = HEX[b & 0xF];
-      ++cnt;
-    }
-    encoded[cnt] = ']';
-    return new String(encoded);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (!(o instanceof ByteKey)) {
-      return false;
-    }
-    ByteKey other = (ByteKey) o;
-    return (other.value.size() == value.size()) && this.compareTo(other) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return value.hashCode();
-  }
-}