You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:47:59 UTC
[12/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
deleted file mode 100644
index eae5e8b..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.common.base.Preconditions;
-
-import org.codehaus.stax2.XMLInputFactory2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.ValidationEvent;
-import javax.xml.bind.ValidationEventHandler;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@code PCollection} of a given type. An Dataflow read transform can be
- * created by passing an {@code XmlSource} object to {@code Read.from()}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Dataflow pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- * .withRootElement("root")
- * .withRecordElement("record")
- * .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- * <li>Include a compatible implementation on the classpath at run-time,
- * such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in Maven sdk/pom.xml file of
- * Google Cloud Dataflow.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- * {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
-public class XmlSource<T> extends FileBasedSource<T> {
-
- private static final String XML_VERSION = "1.1";
- private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
- private final String rootElement;
- private final String recordElement;
- private final Class<T> recordClass;
-
- /**
- * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
- * pattern. Each XML file should be of the form defined in {@link XmlSource}.
- */
- public static <T> XmlSource<T> from(String fileOrPatternSpec) {
- return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
- }
-
- /**
- * Sets name of the root element of the XML document. This will be used to create a valid starting
- * root element when initiating a bundle of records created from an XML document. This is a
- * required parameter.
- */
- public XmlSource<T> withRootElement(String rootElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets name of the record element of the XML document. This will be used to determine offset of
- * the first record of a bundle created from the XML document. This is a required parameter.
- */
- public XmlSource<T> withRecordElement(String recordElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
- * will be used when unmarshalling record objects from the XML file. This is a required
- * parameter.
- */
- public XmlSource<T> withRecordClass(Class<T> recordClass) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
- * to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
- * parameter.
- */
- public XmlSource<T> withMinBundleSize(long minBundleSize) {
- return new XmlSource<>(
- getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
- String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
- String rootElement, String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize, startOffset, endOffset);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
- }
-
- @Override
- protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new XmlSource<T>(
- fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
- }
-
- @Override
- protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
- return new XMLReader<T>(this);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return false;
- }
-
- @Override
- public void validate() {
- super.validate();
- Preconditions.checkNotNull(
- rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
- Preconditions.checkNotNull(
- recordElement,
- "recordElement is null. Use builder method withRecordElement() to set this.");
- Preconditions.checkNotNull(
- recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return JAXBCoder.of(recordClass);
- }
-
- public String getRootElement() {
- return rootElement;
- }
-
- public String getRecordElement() {
- return recordElement;
- }
-
- public Class<T> getRecordClass() {
- return recordClass;
- }
-
- /**
- * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
- * file should be of the form defined at {@link XmlSource}.
- *
- * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp
- * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
- *
- * @param <T> Type of objects that will be read by the reader.
- */
- private static class XMLReader<T> extends FileBasedReader<T> {
- // The amount of bytes read from the channel to memory when determining the starting offset of
- // the first record in a bundle. After matching to starting offset of the first record the
- // remaining bytes read to this buffer and the bytes still not read from the channel are used to
- // create the XML parser.
- private static final int BUF_SIZE = 1024;
-
- // This should be the maximum number of bytes a character will encode to, for any encoding
- // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be
- // four bytes.
- private static final int MAX_CHAR_BYTES = 4;
-
- // In order to support reading starting in the middle of an XML file, we construct an imaginary
- // well-formed document (a header and root tag followed by the contents of the input starting at
- // the record boundary) and feed it to the parser. Because of this, the offset reported by the
- // XML parser is not the same as offset in the original file. They differ by a constant amount:
- // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset;
- // Note that this is true only for files with single-byte characters.
- // It appears that, as of writing, there does not exist a Java XML parser capable of correctly
- // reporting byte offsets of elements in the presence of multi-byte characters.
- private long parserBaseOffset = 0;
- private boolean readingStarted = false;
-
- // If true, the current bundle does not contain any records.
- private boolean emptyBundle = false;
-
- private Unmarshaller jaxbUnmarshaller = null;
- private XMLStreamReader parser = null;
-
- private T currentRecord = null;
-
- // Byte offset of the current record in the XML file provided when creating the source.
- private long currentByteOffset = 0;
-
- public XMLReader(XmlSource<T> source) {
- super(source);
-
- // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
- try {
- JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
- jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
- // Throw errors if validation fails. JAXB by default ignores validation errors.
- jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
- @Override
- public boolean handleEvent(ValidationEvent event) {
- throw new RuntimeException(event.getMessage(), event.getLinkedException());
- }
- });
- } catch (JAXBException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public synchronized XmlSource<T> getCurrentSource() {
- return (XmlSource<T>) super.getCurrentSource();
- }
-
- @Override
- protected void startReading(ReadableByteChannel channel) throws IOException {
- // This method determines the correct starting offset of the first record by reading bytes
- // from the ReadableByteChannel. This implementation does not need the channel to be a
- // SeekableByteChannel.
- // The method tries to determine the first record element in the byte channel. The first
- // record must start with the characters "<recordElement" where "recordElement" is the
- // record element of the XML document described above. For the match to be complete this
- // has to be followed by one of following.
- // * any whitespace character
- // * '>' character
- // * '/' character (to support empty records).
- //
- // After this match this method creates the XML parser for parsing the XML document,
- // feeding it a fake document consisting of an XML header and the <rootElement> tag followed
- // by the contents of channel starting from <recordElement. The <rootElement> tag may be never
- // closed.
-
- // This stores any bytes that should be used prior to the remaining bytes of the channel when
- // creating an XML parser object.
- ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
- // A dummy declaration and root for the document with proper XML version and encoding. Without
- // this XML parsing may fail or may produce incorrect results.
-
- byte[] dummyStartDocumentBytes =
- ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
- + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
- preambleByteBuffer.write(dummyStartDocumentBytes);
- // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
- // method returns the offset and stores any bytes that should be used when creating the XML
- // parser in preambleByteBuffer.
- long offsetInFileOfRecordElement =
- getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
- if (offsetInFileOfRecordElement < 0) {
- // Bundle has no records. So marking this bundle as an empty bundle.
- emptyBundle = true;
- return;
- } else {
- byte[] preambleBytes = preambleByteBuffer.toByteArray();
- currentByteOffset = offsetInFileOfRecordElement;
- setUpXMLParser(channel, preambleBytes);
- parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length;
- }
- readingStarted = true;
- }
-
- // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts
- // any bytes read past the starting offset of the next record back to the preambleByteBuffer.
- // If a record is found, returns the starting offset of the record, otherwise
- // returns -1.
- private long getFirstOccurenceOfRecordElement(
- ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
- int byteIndexInRecordElementToMatch = 0;
- // Index of the byte in the string "<recordElement" to be matched
- // against the current byte from the stream.
- boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
- // next character to confirm if this is a positive match.
- boolean fullyMatched = false; // If true, record element was fully matched.
-
- // This gives the offset of the byte currently being read. We do a '-1' here since we
- // increment this value at the beginning of the while loop below.
- long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
- long startingOffsetInFileOfCurrentMatch = -1;
- // If this is non-negative, currently there is a match in progress and this value gives the
- // starting offset of the match currently being conducted.
- boolean matchStarted = false; // If true, a match is currently in progress.
-
- // These two values are used to determine the character immediately following a match for
- // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
- byte[] charBytes = new byte[MAX_CHAR_BYTES];
- int charBytesFound = 0;
-
- ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
- byte[] recordStartBytes =
- ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
-
- outer: while (channel.read(buf) > 0) {
- buf.flip();
- while (buf.hasRemaining()) {
- offsetInFileOfCurrentByte++;
- byte b = buf.get();
- boolean reset = false;
- if (recordStartBytesMatched) {
- // We already matched "<recordElement" reading the next character to determine if this
- // is a positive match for a new record.
- charBytes[charBytesFound] = b;
- charBytesFound++;
- Character c = null;
- if (charBytesFound == charBytes.length) {
- CharBuffer charBuf = CharBuffer.allocate(1);
- InputStream charBufStream = new ByteArrayInputStream(charBytes);
- java.io.Reader reader =
- new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
- int read = reader.read();
- if (read <= 0) {
- return -1;
- }
- charBuf.flip();
- c = (char) read;
- } else {
- continue;
- }
-
- // Record start may be of following forms
- // * "<recordElement<whitespace>..."
- // * "<recordElement>..."
- // * "<recordElement/..."
- if (Character.isWhitespace(c) || c == '>' || c == '/') {
- fullyMatched = true;
- // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
- // already read from the channel.
- preambleByteBuffer.write(recordStartBytes);
- preambleByteBuffer.write(charBytes);
- // Also add the rest of the current buffer to preambleByteBuffer.
- while (buf.hasRemaining()) {
- preambleByteBuffer.write(buf.get());
- }
- break outer;
- } else {
- // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
- ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
- newbuf.put(charBytes);
- offsetInFileOfCurrentByte -= charBytes.length;
- while (buf.hasRemaining()) {
- newbuf.put(buf.get());
- }
- newbuf.flip();
- buf = newbuf;
-
- // Ignore everything and try again starting from the current buffer.
- reset = true;
- }
- } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
- // Next byte matched.
- if (!matchStarted) {
- // Match was for the first byte, record the starting offset.
- matchStarted = true;
- startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
- }
- byteIndexInRecordElementToMatch++;
- } else {
- // Not a match. Ignore everything and try again starting at current point.
- reset = true;
- }
- if (reset) {
- // Clear variables and try to match starting from the next byte.
- byteIndexInRecordElementToMatch = 0;
- startingOffsetInFileOfCurrentMatch = -1;
- matchStarted = false;
- recordStartBytesMatched = false;
- charBytes = new byte[MAX_CHAR_BYTES];
- charBytesFound = 0;
- }
- if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
- // "<recordElement" matched. Need to still check next byte since this might be an
- // element that has "recordElement" as a prefix.
- recordStartBytesMatched = true;
- }
- }
- buf.clear();
- }
-
- if (!fullyMatched) {
- return -1;
- } else {
- return startingOffsetInFileOfCurrentMatch;
- }
- }
-
- private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException {
- try {
- // We use Woodstox because the StAX implementation provided by OpenJDK reports
- // character locations incorrectly. Note that Woodstox still currently reports *byte*
- // locations incorrectly when parsing documents that contain multi-byte characters.
- XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance();
- this.parser = xmlInputFactory.createXMLStreamReader(
- new SequenceInputStream(
- new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
- "UTF-8");
-
- // Current offset should be the offset before reading the record element.
- while (true) {
- int event = parser.next();
- if (event == XMLStreamConstants.START_ELEMENT) {
- String localName = parser.getLocalName();
- if (localName.equals(getCurrentSource().recordElement)) {
- break;
- }
- }
- }
- } catch (FactoryConfigurationError | XMLStreamException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- protected boolean readNextRecord() throws IOException {
- if (emptyBundle) {
- currentByteOffset = Long.MAX_VALUE;
- return false;
- }
- try {
- // Update current offset and check if the next value is the record element.
- currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
- while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
- parser.next();
- currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
- if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
- currentByteOffset = Long.MAX_VALUE;
- return false;
- }
- }
- JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
- currentRecord = jb.getValue();
- return true;
- } catch (JAXBException | XMLStreamException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (!readingStarted) {
- throw new NoSuchElementException();
- }
- return currentRecord;
- }
-
- @Override
- protected boolean isAtSplitPoint() {
- // Every record is at a split point.
- return true;
- }
-
- @Override
- protected long getCurrentOffset() {
- return currentByteOffset;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 0ce2a5e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,989 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.io.range.ByteKey;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table")
- * .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- * BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
- private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
- /**
- * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
- * initialized with a
- * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
- * specifies which table to read. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}.
- */
- @Experimental
- public static Read read() {
- return new Read(null, "", null, null);
- }
-
- /**
- * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
- * initialized with a
- * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
- * specifies which table to write.
- */
- @Experimental
- public static Write write() {
- return new Write(null, "", null);
- }
-
- /**
- * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Read extends PTransform<PBegin, PCollection<Row>> {
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Read}.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Read(optionsWithAgent, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
- * using the given row filter.
- *
- * <p>Does not modify this object.
- */
- public Read withRowFilter(RowFilter filter) {
- checkNotNull(filter, "filter");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the specified table.
- *
- * <p>Does not modify this object.
- */
- public Read withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being read from.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PCollection<Row> apply(PBegin input) {
- BigtableSource source =
- new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
- return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
- }
-
- @Override
- public void validate(PBegin input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Read.class)
- .add("options", options)
- .add("tableId", tableId)
- .add("filter", filter)
- .toString();
- }
-
- /////////////////////////////////////////////////////////////////////////////////////////
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final RowFilter filter;
- @Nullable private final BigtableService bigtableService;
-
- private Read(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable RowFilter filter,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.filter = filter;
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Read withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- /**
- * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Write
- extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final BigtableService bigtableService;
-
- private Write(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Write}.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Write(optionsWithAgent, tableId, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Write(options, tableId, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being written to.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- Sink sink = new Sink(tableId, getBigtableService());
- return input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink));
- }
-
- @Override
- public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Write withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Write(options, tableId, bigtableService);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Write.class)
- .add("options", options)
- .add("tableId", tableId)
- .toString();
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////
- /** Disallow construction of utility class. */
- private BigtableIO() {}
-
- static class BigtableSource extends BoundedSource<Row> {
- public BigtableSource(
- BigtableService service,
- String tableId,
- @Nullable RowFilter filter,
- ByteKeyRange range,
- Long estimatedSizeBytes) {
- this.service = service;
- this.tableId = tableId;
- this.filter = filter;
- this.range = range;
- this.estimatedSizeBytes = estimatedSizeBytes;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(BigtableSource.class)
- .add("tableId", tableId)
- .add("filter", filter)
- .add("range", range)
- .add("estimatedSizeBytes", estimatedSizeBytes)
- .toString();
- }
-
- ////// Private state and internal implementation details //////
- private final BigtableService service;
- @Nullable private final String tableId;
- @Nullable private final RowFilter filter;
- private final ByteKeyRange range;
- @Nullable private Long estimatedSizeBytes;
- @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
- protected BigtableSource withStartKey(ByteKey startKey) {
- checkNotNull(startKey, "startKey");
- return new BigtableSource(
- service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEndKey(ByteKey endKey) {
- checkNotNull(endKey, "endKey");
- return new BigtableSource(
- service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
- checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
- return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
- }
-
- /**
- * Makes an API call to the Cloud Bigtable service that gives information about tablet key
- * boundaries and estimated sizes. We can use these samples to ensure that splits are on
- * different tablets, and possibly generate sub-splits within tablets.
- */
- private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
- return service.getSampleRowKeys(this);
- }
-
- @Override
- public List<BigtableSource> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Update the desiredBundleSizeBytes in order to limit the
- // number of splits to maximumNumberOfSplits.
- long maximumNumberOfSplits = 4000;
- long sizeEstimate = getEstimatedSizeBytes(options);
- desiredBundleSizeBytes =
- Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
- // Delegate to testable helper.
- return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
- }
-
- /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
- private List<BigtableSource> splitIntoBundlesBasedOnSamples(
- long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
- // There are no regions, or no samples available. Just scan the entire range.
- if (sampleRowKeys.isEmpty()) {
- logger.info("Not splitting source {} because no sample row keys are available.", this);
- return Collections.singletonList(this);
- }
-
- logger.info(
- "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
- desiredBundleSizeBytes,
- sampleRowKeys.size(),
- sampleRowKeys.get(0));
-
- // Loop through all sampled responses and generate splits from the ones that overlap the
- // scan range. The main complication is that we must track the end range of the previous
- // sample to generate good ranges.
- ByteKey lastEndKey = ByteKey.EMPTY;
- long lastOffset = 0;
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- for (SampleRowKeysResponse response : sampleRowKeys) {
- ByteKey responseEndKey = ByteKey.of(response.getRowKey());
- long responseOffset = response.getOffsetBytes();
- checkState(
- responseOffset >= lastOffset,
- "Expected response byte offset %s to come after the last offset %s",
- responseOffset,
- lastOffset);
-
- if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
- // This region does not overlap the scan, so skip it.
- lastOffset = responseOffset;
- lastEndKey = responseEndKey;
- continue;
- }
-
- // Calculate the beginning of the split as the larger of startKey and the end of the last
- // split. Unspecified start is smallest key so is correctly treated as earliest key.
- ByteKey splitStartKey = lastEndKey;
- if (splitStartKey.compareTo(range.getStartKey()) < 0) {
- splitStartKey = range.getStartKey();
- }
-
- // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
- // that range.containsKey handles the case when range.getEndKey() is empty.
- ByteKey splitEndKey = responseEndKey;
- if (!range.containsKey(splitEndKey)) {
- splitEndKey = range.getEndKey();
- }
-
- // We know this region overlaps the desired key range, and we know a rough estimate of its
- // size. Split the key range into bundle-sized chunks and then add them all as splits.
- long sampleSizeBytes = responseOffset - lastOffset;
- List<BigtableSource> subSplits =
- splitKeyRangeIntoBundleSizedSubranges(
- sampleSizeBytes,
- desiredBundleSizeBytes,
- ByteKeyRange.of(splitStartKey, splitEndKey));
- splits.addAll(subSplits);
-
- // Move to the next region.
- lastEndKey = responseEndKey;
- lastOffset = responseOffset;
- }
-
- // We must add one more region after the end of the samples if both these conditions hold:
- // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
- // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
- if (!lastEndKey.isEmpty()
- && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
- splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
- }
-
- List<BigtableSource> ret = splits.build();
- logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
- return ret;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
- // Delegate to testable helper.
- if (estimatedSizeBytes == null) {
- estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Computes the estimated size in bytes based on the total size of all samples that overlap
- * the key range this source will scan.
- */
- private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
- long estimatedSizeBytes = 0;
- long lastOffset = 0;
- ByteKey currentStartKey = ByteKey.EMPTY;
- // Compute the total estimated size as the size of each sample that overlaps the scan range.
- // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
- // filter or to sample on a given key range.
- for (SampleRowKeysResponse response : samples) {
- ByteKey currentEndKey = ByteKey.of(response.getRowKey());
- long currentOffset = response.getOffsetBytes();
- if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
- // Skip an empty region.
- lastOffset = currentOffset;
- continue;
- } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
- estimatedSizeBytes += currentOffset - lastOffset;
- }
- currentStartKey = currentEndKey;
- lastOffset = currentOffset;
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Cloud Bigtable returns query results ordered by key.
- */
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
- return new BigtableReader(this, service);
- }
-
- @Override
- public void validate() {
- checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
- }
-
- @Override
- public Coder<Row> getDefaultOutputCoder() {
- return Proto2Coder.of(Row.class);
- }
-
- /** Helper that splits the specified range in this source into bundles. */
- private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
- long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
- // Catch the trivial cases. Split is small enough already, or this is the last region.
- logger.debug(
- "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
- sampleSizeBytes,
- desiredBundleSizeBytes);
- if (sampleSizeBytes <= desiredBundleSizeBytes) {
- return Collections.singletonList(
- this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
- }
-
- checkArgument(
- sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
- checkArgument(
- desiredBundleSizeBytes > 0,
- "Desired bundle size %s bytes must be greater than 0.",
- desiredBundleSizeBytes);
-
- int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
- List<ByteKey> splitKeys = range.split(splitCount);
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- Iterator<ByteKey> keys = splitKeys.iterator();
- ByteKey prev = keys.next();
- while (keys.hasNext()) {
- ByteKey next = keys.next();
- splits.add(
- this
- .withStartKey(prev)
- .withEndKey(next)
- .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
- prev = next;
- }
- return splits.build();
- }
-
- public ByteKeyRange getRange() {
- return range;
- }
-
- public RowFilter getRowFilter() {
- return filter;
- }
-
- public String getTableId() {
- return tableId;
- }
- }
-
- private static class BigtableReader extends BoundedReader<Row> {
- // Thread-safety: source is protected via synchronization and is only accessed or modified
- // inside a synchronized block (or constructor, which is the same).
- private BigtableSource source;
- private BigtableService service;
- private BigtableService.Reader reader;
- private final ByteKeyRangeTracker rangeTracker;
- private long recordsReturned;
-
- public BigtableReader(BigtableSource source, BigtableService service) {
- this.source = source;
- this.service = service;
- rangeTracker = ByteKeyRangeTracker.of(source.getRange());
- }
-
- @Override
- public boolean start() throws IOException {
- reader = service.createReader(getCurrentSource());
- boolean hasRecord =
- reader.start()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public synchronized BigtableSource getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean advance() throws IOException {
- boolean hasRecord =
- reader.advance()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public Row getCurrent() throws NoSuchElementException {
- return reader.getCurrentRow();
- }
-
- @Override
- public void close() throws IOException {
- logger.info("Closing reader after reading {} records.", recordsReturned);
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- @Override
- public final Double getFractionConsumed() {
- return rangeTracker.getFractionConsumed();
- }
-
- @Override
- public final synchronized BigtableSource splitAtFraction(double fraction) {
- ByteKey splitKey;
- try {
- splitKey = source.getRange().interpolateKey(fraction);
- } catch (IllegalArgumentException e) {
- logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
- return null;
- }
- logger.debug(
- "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
- if (!rangeTracker.trySplitAtPosition(splitKey)) {
- return null;
- }
- BigtableSource primary = source.withEndKey(splitKey);
- BigtableSource residual = source.withStartKey(splitKey);
- this.source = primary;
- return residual;
- }
- }
-
- private static class Sink
- extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
- public Sink(String tableId, BigtableService bigtableService) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = checkNotNull(bigtableService, "bigtableService");
- }
-
- public String getTableId() {
- return tableId;
- }
-
- public BigtableService getBigtableService() {
- return bigtableService;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Sink.class)
- .add("bigtableService", bigtableService)
- .add("tableId", tableId)
- .toString();
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- private final String tableId;
- private final BigtableService bigtableService;
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
- PipelineOptions options) {
- return new BigtableWriteOperation(this);
- }
-
- /** Does nothing, as it is redundant with {@link Write#validate}. */
- @Override
- public void validate(PipelineOptions options) {}
- }
-
- private static class BigtableWriteOperation
- extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
- private final Sink sink;
-
- public BigtableWriteOperation(Sink sink) {
- this.sink = sink;
- }
-
- @Override
- public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
- throws Exception {
- return new BigtableWriter(this);
- }
-
- @Override
- public void initialize(PipelineOptions options) {}
-
- @Override
- public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
- long count = 0;
- for (Long value : writerResults) {
- value += count;
- }
- logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
- }
-
- @Override
- public Sink getSink() {
- return sink;
- }
-
- @Override
- public Coder<Long> getWriterResultCoder() {
- return VarLongCoder.of();
- }
- }
-
- private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
- private final BigtableWriteOperation writeOperation;
- private final Sink sink;
- private BigtableService.Writer bigtableWriter;
- private long recordsWritten;
- private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
- public BigtableWriter(BigtableWriteOperation writeOperation) {
- this.writeOperation = writeOperation;
- this.sink = writeOperation.getSink();
- this.failures = new ConcurrentLinkedQueue<>();
- }
-
- @Override
- public void open(String uId) throws Exception {
- bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
- recordsWritten = 0;
- }
-
- /**
- * If any write has asynchronously failed, fail the bundle with a useful error.
- */
- private void checkForFailures() throws IOException {
- // Note that this function is never called by multiple threads and is the only place that
- // we remove from failures, so this code is safe.
- if (failures.isEmpty()) {
- return;
- }
-
- StringBuilder logEntry = new StringBuilder();
- int i = 0;
- for (; i < 10 && !failures.isEmpty(); ++i) {
- BigtableWriteException exc = failures.remove();
- logEntry.append("\n").append(exc.getMessage());
- if (exc.getCause() != null) {
- logEntry.append(": ").append(exc.getCause().getMessage());
- }
- }
- String message =
- String.format(
- "At least %d errors occurred writing to Bigtable. First %d errors: %s",
- i + failures.size(),
- i,
- logEntry.toString());
- logger.error(message);
- throw new IOException(message);
- }
-
- @Override
- public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
- checkForFailures();
- Futures.addCallback(
- bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
- ++recordsWritten;
- }
-
- @Override
- public Long close() throws Exception {
- bigtableWriter.close();
- bigtableWriter = null;
- checkForFailures();
- logger.info("Wrote {} records", recordsWritten);
- return recordsWritten;
- }
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
- return writeOperation;
- }
-
- private class WriteExceptionCallback implements FutureCallback<Empty> {
- private final KV<ByteString, Iterable<Mutation>> value;
-
- public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
- this.value = value;
- }
-
- @Override
- public void onFailure(Throwable cause) {
- failures.add(new BigtableWriteException(value, cause));
- }
-
- @Override
- public void onSuccess(Empty produced) {}
- }
- }
-
- /**
- * An exception that puts information about the failed record being written in its message.
- */
- static class BigtableWriteException extends IOException {
- public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
- super(
- String.format(
- "Error mutating row %s with mutations %s",
- record.getKey().toStringUtf8(),
- record.getValue()),
- cause);
- }
- }
-
- /**
- * A helper function to produce a Cloud Bigtable user agent string.
- */
- private static String getUserAgent() {
- String javaVersion = System.getProperty("java.specification.version");
- DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo();
- return String.format(
- "%s/%s (%s); %s",
- info.getName(),
- info.getVersion(),
- javaVersion,
- "0.2.3" /* TODO get Bigtable client version directly from jar. */);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index 0c47f65..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
- /**
- * The interface of a class that can write to Cloud Bigtable.
- */
- interface Writer {
- /**
- * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
- * row key to be mutated and the iterable of mutations represent the changes to be made to the
- * row.
- *
- * @throws IOException if there is an error submitting the write.
- */
- ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException;
-
- /**
- * Closes the writer.
- *
- * @throws IOException if any writes did not succeed
- */
- void close() throws IOException;
- }
-
- /**
- * The interface of a class that reads from Cloud Bigtable.
- */
- interface Reader {
- /**
- * Reads the first element (including initialization, such as opening a network connection) and
- * returns true if an element was found.
- */
- boolean start() throws IOException;
-
- /**
- * Attempts to read the next element, and returns true if an element has been read.
- */
- boolean advance() throws IOException;
-
- /**
- * Closes the reader.
- *
- * @throws IOException if there is an error.
- */
- void close() throws IOException;
-
- /**
- * Returns the last row read by a successful start() or advance(), or throws if there is no
- * current row because the last such call was unsuccessful.
- */
- Row getCurrentRow() throws NoSuchElementException;
- }
-
- /**
- * Returns {@code true} if the table with the give name exists.
- */
- boolean tableExists(String tableId) throws IOException;
-
- /**
- * Returns a {@link Reader} that will read from the specified source.
- */
- Reader createReader(BigtableSource source) throws IOException;
-
- /**
- * Returns a {@link Writer} that will write to the specified table.
- */
- Writer openForWriting(String tableId) throws IOException;
-
- /**
- * Returns a set of row keys sampled from the underlying table. These contain information about
- * the distribution of keys within the table.
- */
- List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 9f32022..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
- private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
- public BigtableServiceImpl(BigtableOptions options) {
- this.options = options;
- }
-
- private final BigtableOptions options;
-
- @Override
- public BigtableWriterImpl openForWriting(String tableId) throws IOException {
- BigtableSession session = new BigtableSession(options);
- String tableName = options.getClusterName().toTableNameStr(tableId);
- return new BigtableWriterImpl(session, tableName);
- }
-
- @Override
- public boolean tableExists(String tableId) throws IOException {
- if (!BigtableSession.isAlpnProviderEnabled()) {
- logger.info(
- "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
- + " configured.",
- tableId,
- options);
- return true;
- }
-
- try (BigtableSession session = new BigtableSession(options)) {
- GetTableRequest getTable =
- GetTableRequest.newBuilder()
- .setName(options.getClusterName().toTableNameStr(tableId))
- .build();
- session.getTableAdminClient().getTable(getTable);
- return true;
- } catch (StatusRuntimeException e) {
- if (e.getStatus().getCode() == Code.NOT_FOUND) {
- return false;
- }
- String message =
- String.format(
- "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
- logger.error(message, e);
- throw new IOException(message, e);
- }
- }
-
- private class BigtableReaderImpl implements Reader {
- private BigtableSession session;
- private final BigtableSource source;
- private ResultScanner<Row> results;
- private Row currentRow;
-
- public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
- this.session = session;
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- RowRange range =
- RowRange.newBuilder()
- .setStartKey(source.getRange().getStartKey().getValue())
- .setEndKey(source.getRange().getEndKey().getValue())
- .build();
- ReadRowsRequest.Builder requestB =
- ReadRowsRequest.newBuilder()
- .setRowRange(range)
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
- }
- results = session.getDataClient().readRows(requestB.build());
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return (currentRow != null);
- }
-
- @Override
- public void close() throws IOException {
- // Goal: by the end of this function, both results and session are null and closed,
- // independent of what errors they throw or prior state.
-
- if (session == null) {
- // Only possible when previously closed, so we know that results is also null.
- return;
- }
-
- // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
- // the Closer, but we can use the Closer to simplify the error handling.
- try (Closer closer = Closer.create()) {
- if (results != null) {
- closer.register(results);
- results = null;
- }
-
- session.close();
- } finally {
- session = null;
- }
- }
-
- @Override
- public Row getCurrentRow() throws NoSuchElementException {
- if (currentRow == null) {
- throw new NoSuchElementException();
- }
- return currentRow;
- }
- }
-
- private static class BigtableWriterImpl implements Writer {
- private BigtableSession session;
- private AsyncExecutor executor;
- private final MutateRowRequest.Builder partialBuilder;
-
- public BigtableWriterImpl(BigtableSession session, String tableName) {
- this.session = session;
- this.executor =
- new AsyncExecutor(
- session.getDataClient(),
- new HeapSizeManager(
- AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
- AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
- partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (executor != null) {
- executor.flush();
- executor = null;
- }
- } finally {
- if (session != null) {
- session.close();
- session = null;
- }
- }
- }
-
- @Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException {
- MutateRowRequest r =
- partialBuilder
- .clone()
- .setRowKey(record.getKey())
- .addAllMutations(record.getValue())
- .build();
- try {
- return executor.mutateRowAsync(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Write interrupted", e);
- }
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects
- .toStringHelper(BigtableServiceImpl.class)
- .add("options", options)
- .toString();
- }
-
- @Override
- public Reader createReader(BigtableSource source) throws IOException {
- BigtableSession session = new BigtableSession(options);
- return new BigtableReaderImpl(session, source);
- }
-
- @Override
- public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
- try (BigtableSession session = new BigtableSession(options)) {
- SampleRowKeysRequest request =
- SampleRowKeysRequest.newBuilder()
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
- .build();
- return session.getDataClient().sampleRowKeys(request);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
deleted file mode 100644
index 553f46c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
deleted file mode 100644
index 5f0050d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing common storage formats, including
- * {@link com.google.cloud.dataflow.sdk.io.AvroIO},
- * {@link com.google.cloud.dataflow.sdk.io.BigQueryIO}, and
- * {@link com.google.cloud.dataflow.sdk.io.TextIO}.
- *
- * <p>The classes in this package provide {@code Read} transforms that create PCollections
- * from existing storage:
- * <pre>{@code
- * PCollection<TableRow> inputData = pipeline.apply(
- * BigQueryIO.Read.named("Read")
- * .from("clouddataflow-readonly:samples.weather_stations");
- * }</pre>
- * and {@code Write} transforms that persist PCollections to external storage:
- * <pre> {@code
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- * .to("gs://my_bucket/path/to/numbers"));
- * } </pre>
- */
-package com.google.cloud.dataflow.sdk.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
deleted file mode 100644
index 5b9a003..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ByteString.ByteIterator;
-
-import java.io.Serializable;
-
-/**
- * A class representing a key consisting of an array of bytes. Arbitrary-length
- * {@code byte[]} keys are typical in key-value stores such as Google Cloud Bigtable.
- *
- * <p>Instances of {@link ByteKey} are immutable.
- *
- * <p>{@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the successor
- * to a key is the same key with an additional 0 byte appended; and keys have unbounded size.
- *
- * <p>Note that the empty {@link ByteKey} compares smaller than all other keys, but some systems
- * have the semantic that when an empty {@link ByteKey} is used as an upper bound, it represents
- * the largest possible key. In these cases, implementors should use {@link #isEmpty} to test
- * whether an upper bound key is empty.
- */
-public final class ByteKey implements Comparable<ByteKey>, Serializable {
- /** An empty key. */
- public static final ByteKey EMPTY = ByteKey.of();
-
- /**
- * Creates a new {@link ByteKey} backed by the specified {@link ByteString}.
- */
- public static ByteKey of(ByteString value) {
- return new ByteKey(value);
- }
-
- /**
- * Creates a new {@link ByteKey} backed by a copy of the specified {@code byte[]}.
- *
- * <p>Makes a copy of the underlying array.
- */
- public static ByteKey copyFrom(byte[] bytes) {
- return of(ByteString.copyFrom(bytes));
- }
-
- /**
- * Creates a new {@link ByteKey} backed by a copy of the specified {@code int[]}. This method is
- * primarily used as a convenience to create a {@link ByteKey} in code without casting down to
- * signed Java {@link Byte bytes}:
- *
- * <pre>{@code
- * ByteKey key = ByteKey.of(0xde, 0xad, 0xbe, 0xef);
- * }</pre>
- *
- * <p>Makes a copy of the input.
- */
- public static ByteKey of(int... bytes) {
- byte[] ret = new byte[bytes.length];
- for (int i = 0; i < bytes.length; ++i) {
- ret[i] = (byte) (bytes[i] & 0xff);
- }
- return ByteKey.copyFrom(ret);
- }
-
- /**
- * Returns an immutable {@link ByteString} representing this {@link ByteKey}.
- *
- * <p>Does not copy.
- */
- public ByteString getValue() {
- return value;
- }
-
- /**
- * Returns a newly-allocated {@code byte[]} representing this {@link ByteKey}.
- *
- * <p>Copies the underlying {@code byte[]}.
- */
- public byte[] getBytes() {
- return value.toByteArray();
- }
-
- /**
- * Returns {@code true} if the {@code byte[]} backing this {@link ByteKey} is of length 0.
- */
- public boolean isEmpty() {
- return value.isEmpty();
- }
-
- /**
- * {@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the
- * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the
- * successor to a key is the same key with an additional 0 byte appended; and keys have unbounded
- * size.
- */
- @Override
- public int compareTo(ByteKey other) {
- checkNotNull(other, "other");
- ByteIterator thisIt = value.iterator();
- ByteIterator otherIt = other.value.iterator();
- while (thisIt.hasNext() && otherIt.hasNext()) {
- // (byte & 0xff) converts [-128,127] bytes to [0,255] ints.
- int cmp = (thisIt.nextByte() & 0xff) - (otherIt.nextByte() & 0xff);
- if (cmp != 0) {
- return cmp;
- }
- }
- // If we get here, the prefix of both arrays is equal up to the shorter array. The array with
- // more bytes is larger.
- return value.size() - other.value.size();
- }
-
- ////////////////////////////////////////////////////////////////////////////////////
- private final ByteString value;
-
- private ByteKey(ByteString value) {
- this.value = value;
- }
-
- /** Array used as a helper in {@link #toString}. */
- private static final char[] HEX =
- new char[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
-
- // Prints the key as a string "[deadbeef]".
- @Override
- public String toString() {
- char[] encoded = new char[2 * value.size() + 2];
- encoded[0] = '[';
- int cnt = 1;
- ByteIterator iterator = value.iterator();
- while (iterator.hasNext()) {
- byte b = iterator.nextByte();
- encoded[cnt] = HEX[(b & 0xF0) >>> 4];
- ++cnt;
- encoded[cnt] = HEX[b & 0xF];
- ++cnt;
- }
- encoded[cnt] = ']';
- return new String(encoded);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof ByteKey)) {
- return false;
- }
- ByteKey other = (ByteKey) o;
- return (other.value.size() == value.size()) && this.compareTo(other) == 0;
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-}