You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:04 UTC
[40/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
deleted file mode 100644
index 0b78b83..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import org.joda.time.Instant;
-
-import java.util.UUID;
-
-/**
- * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
- * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}. In the case of an empty PCollection, only
- * the global initialization and finalization will be performed.
- *
- * <p>Currently, only batch workflows can contain Write transforms.
- *
- * <p>Example usage:
- *
- * <p>{@code p.apply(Write.to(new MySink(...)));}
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
- /**
- * Creates a Write transform that writes to the given Sink.
- */
- public static <T> Bound<T> to(Sink<T> sink) {
- return new Bound<>(sink);
- }
-
- /**
- * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for
- * documentation about writing to Sinks.
- */
- public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
- private final Sink<T> sink;
-
- private Bound(Sink<T> sink) {
- this.sink = sink;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- PipelineOptions options = input.getPipeline().getOptions();
- sink.validate(options);
- return createWrite(input, sink.createWriteOperation(options));
- }
-
- /**
- * Returns the {@link Sink} associated with this PTransform.
- */
- public Sink<T> getSink() {
- return sink;
- }
-
- /**
- * A write is performed as sequence of three {@link ParDo}'s.
- *
- * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
- * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
- * called. The output of this ParDo is a singleton PCollection
- * containing the WriteOperation.
- *
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase,
- * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
- * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every
- * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i>
- * objects (see {@link Sink} for a description of writer results)-one for each bundle.
- *
- * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
- * the collection of writer results as a side-input. In this ParDo,
- * {@link WriteOperation#finalize} is called to finalize the write.
- *
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
- *
- * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
- * deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
- */
- private <WriteT> PDone createWrite(
- PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
- Pipeline p = input.getPipeline();
-
- // A coder to use for the WriteOperation.
- @SuppressWarnings("unchecked")
- Coder<WriteOperation<T, WriteT>> operationCoder =
- (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
- // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
- // the sink.
- PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
-
- // Initialize the resource in a do-once ParDo on the WriteOperation.
- operationCollection = operationCollection
- .apply("Initialize", ParDo.of(
- new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- writeOperation.initialize(c.getPipelineOptions());
- // The WriteOperation is also the output of this ParDo, so it can have mutable
- // state.
- c.output(writeOperation);
- }
- }))
- .setCoder(operationCoder);
-
- // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
- final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
- operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
- // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
- // as a side input) and collect the results of the writes in a PCollection.
- // There is a dependency between this ParDo and the first (the WriteOperation PCollection
- // as a side input), so this will happen after the initial ParDo.
- PCollection<WriteT> results = input
- .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T, WriteT> writer = null;
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- }
- throw e;
- }
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- WriteT result = writer.close();
- // Output the result of the write.
- c.outputWithTimestamp(result, Instant.now());
- }
- }
- }).withSideInputs(writeOperationView))
- .setCoder(writeOperation.getWriterResultCoder())
- .apply(Window.<WriteT>into(new GlobalWindows()));
-
- final PCollectionView<Iterable<WriteT>> resultsView =
- results.apply(View.<WriteT>asIterable());
-
- // Finalize the write in another do-once ParDo on the singleton collection containing the
- // Writer. The results from the per-bundle writes are given as an Iterable side input.
- // The WriteOperation's state is the same as after its initialization in the first do-once
- // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
- // collection as a side input), so it will happen after the parallel write.
- @SuppressWarnings("unused")
- final PCollection<Integer> done = operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- Iterable<WriteT> results = c.sideInput(resultsView);
- WriteOperation<T, WriteT> writeOperation = c.element();
- writeOperation.finalize(results, c.getPipelineOptions());
- }
- }).withSideInputs(resultsView));
- return PDone.in(input.getPipeline());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
deleted file mode 100644
index b728c0a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Preconditions;
-
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
- * records from JAXB-annotated classes to a single file location.
- *
- * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this
- * Sink will produce a single file consisting of a single root element that contains all of the
- * elements in the PCollection.
- *
- * <p>XML Sinks are created with a base filename to write to, a root element name that will be used
- * for the root element of the output files, and a class to bind to an XML element. This class
- * will be used in the marshalling of records in an input PCollection to their XML representation
- * and must be able to be bound using JAXB annotations (checked at pipeline construction time).
- *
- * <p>XML Sinks can be written to using the {@link Write} transform:
- *
- * <pre>
- * p.apply(Write.to(
- * XmlSink.ofRecordClass(Type.class)
- * .withRootElementName(root_element)
- * .toFilenamePrefix(output_filename)));
- * </pre>
- *
- * <p>For example, consider the following class with JAXB annotations:
- *
- * <pre>
- * {@literal @}XmlRootElement(name = "word_count_result")
- * {@literal @}XmlType(propOrder = {"word", "frequency"})
- * public class WordFrequency {
- * private String word;
- * private long frequency;
- *
- * public WordFrequency() { }
- *
- * public WordFrequency(String word, long frequency) {
- * this.word = word;
- * this.frequency = frequency;
- * }
- *
- * public void setWord(String word) {
- * this.word = word;
- * }
- *
- * public void setFrequency(long frequency) {
- * this.frequency = frequency;
- * }
- *
- * public long getFrequency() {
- * return frequency;
- * }
- *
- * public String getWord() {
- * return word;
- * }
- * }
- * </pre>
- *
- * <p>The following will produce XML output with a root element named "words" from a PCollection of
- * WordFrequency objects:
- * <pre>
- * p.apply(Write.to(
- * XmlSink.ofRecordClass(WordFrequency.class)
- * .withRootElement("words")
- * .toFilenamePrefix(output_file)));
- * </pre>
- *
- * <p>The output of which will look like:
- * <pre>
- * {@code
- * <words>
- *
- * <word_count_result>
- * <word>decreased</word>
- * <frequency>1</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>War</word>
- * <frequency>4</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>empress'</word>
- * <frequency>14</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>stoops</word>
- * <frequency>6</frequency>
- * </word_count_result>
- *
- * ...
- * </words>
- * }</pre>
- */
-// CHECKSTYLE.ON: JavadocStyle
-@SuppressWarnings("checkstyle:javadocstyle")
-public class XmlSink {
- protected static final String XML_EXTENSION = "xml";
-
- /**
- * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root
- * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link
- * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively.
- */
- public static Bound<?> write() {
- return new Bound<>(null, null, null);
- }
-
- /**
- * Returns an XmlSink that writes objects as XML entities.
- *
- * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n
- * is the number of output bundles that the Dataflow service divides the output into.
- *
- * @param klass the class of the elements to write.
- * @param rootElementName the enclosing root element.
- * @param baseOutputFilename the output filename prefix.
- */
- public static <T> Bound<T> writeOf(
- Class<T> klass, String rootElementName, String baseOutputFilename) {
- return new Bound<>(klass, rootElementName, baseOutputFilename);
- }
-
- /**
- * A {@link FileBasedSink} that writes objects as XML elements.
- */
- public static class Bound<T> extends FileBasedSink<T> {
- final Class<T> classToBind;
- final String rootElementName;
-
- private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) {
- super(baseOutputFilename, XML_EXTENSION);
- this.classToBind = classToBind;
- this.rootElementName = rootElementName;
- }
-
- /**
- * Returns an XmlSink that writes objects of the class specified as XML elements.
- *
- * <p>The specified class must be able to be used to create a JAXB context.
- */
- public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
- }
-
- /**
- * Returns an XmlSink that writes to files with the given prefix.
- *
- * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
- * the number of output bundles that the Dataflow service divides the output into.
- */
- public Bound<T> toFilenamePrefix(String baseOutputFilename) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
- }
-
- /**
- * Returns an XmlSink that writes XML files with an enclosing root element of the
- * supplied name.
- */
- public Bound<T> withRootElement(String rootElementName) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
- }
-
- /**
- * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
- * been set and that the class can be bound in a JAXB context.
- */
- @Override
- public void validate(PipelineOptions options) {
- Preconditions.checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
- Preconditions.checkNotNull(rootElementName, "Missing a root element name.");
- Preconditions.checkNotNull(baseOutputFilename, "Missing a filename to write to.");
- try {
- JAXBContext.newInstance(classToBind);
- } catch (JAXBException e) {
- throw new RuntimeException("Error binding classes to a JAXB Context.", e);
- }
- }
-
- /**
- * Creates an {@link XmlWriteOperation}.
- */
- @Override
- public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
- return new XmlWriteOperation<>(this);
- }
- }
-
- /**
- * {@link Sink.WriteOperation} for XML {@link Sink}s.
- */
- protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
- public XmlWriteOperation(XmlSink.Bound<T> sink) {
- super(sink);
- }
-
- /**
- * Creates a {@link XmlWriter} with a marshaller for the type it will write.
- */
- @Override
- public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
- JAXBContext context;
- Marshaller marshaller;
- context = JAXBContext.newInstance(getSink().classToBind);
- marshaller = context.createMarshaller();
- marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
- marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
- marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
- return new XmlWriter<>(this, marshaller);
- }
-
- /**
- * Return the XmlSink.Bound for this write operation.
- */
- @Override
- public XmlSink.Bound<T> getSink() {
- return (XmlSink.Bound<T>) super.getSink();
- }
- }
-
- /**
- * A {@link Sink.Writer} that can write objects as XML elements.
- */
- protected static final class XmlWriter<T> extends FileBasedWriter<T> {
- final Marshaller marshaller;
- private OutputStream os = null;
-
- public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
- super(writeOperation);
- this.marshaller = marshaller;
- }
-
- /**
- * Creates the output stream that elements will be written to.
- */
- @Override
- protected void prepareWrite(WritableByteChannel channel) throws Exception {
- os = Channels.newOutputStream(channel);
- }
-
- /**
- * Writes the root element opening tag.
- */
- @Override
- protected void writeHeader() throws Exception {
- String rootElementName = getWriteOperation().getSink().rootElementName;
- os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
- }
-
- /**
- * Writes the root element closing tag.
- */
- @Override
- protected void writeFooter() throws Exception {
- String rootElementName = getWriteOperation().getSink().rootElementName;
- os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
- }
-
- /**
- * Writes a value to the stream.
- */
- @Override
- public void write(T value) throws Exception {
- marshaller.marshal(value, os);
- }
-
- /**
- * Return the XmlWriteOperation this write belongs to.
- */
- @Override
- public XmlWriteOperation<T> getWriteOperation() {
- return (XmlWriteOperation<T>) super.getWriteOperation();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
deleted file mode 100644
index 1ead391..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.common.base.Preconditions;
-
-import org.codehaus.stax2.XMLInputFactory2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.ValidationEvent;
-import javax.xml.bind.ValidationEventHandler;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@code PCollection} of a given type. An Dataflow read transform can be
- * created by passing an {@code XmlSource} object to {@code Read.from()}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Dataflow pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- * .withRootElement("root")
- * .withRecordElement("record")
- * .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- * <li>Include a compatible implementation on the classpath at run-time,
- * such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in Maven sdk/pom.xml file of
- * Google Cloud Dataflow.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- * {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
-public class XmlSource<T> extends FileBasedSource<T> {
-
- private static final String XML_VERSION = "1.1";
- private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
- private final String rootElement;
- private final String recordElement;
- private final Class<T> recordClass;
-
- /**
- * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
- * pattern. Each XML file should be of the form defined in {@link XmlSource}.
- */
- public static <T> XmlSource<T> from(String fileOrPatternSpec) {
- return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
- }
-
- /**
- * Sets name of the root element of the XML document. This will be used to create a valid starting
- * root element when initiating a bundle of records created from an XML document. This is a
- * required parameter.
- */
- public XmlSource<T> withRootElement(String rootElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets name of the record element of the XML document. This will be used to determine offset of
- * the first record of a bundle created from the XML document. This is a required parameter.
- */
- public XmlSource<T> withRecordElement(String recordElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
- * will be used when unmarshalling record objects from the XML file. This is a required
- * parameter.
- */
- public XmlSource<T> withRecordClass(Class<T> recordClass) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
-
- /**
- * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
- * to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
- * parameter.
- */
- public XmlSource<T> withMinBundleSize(long minBundleSize) {
- return new XmlSource<>(
- getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
- String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
- String rootElement, String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize, startOffset, endOffset);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
- }
-
- @Override
- protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new XmlSource<T>(
- fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
- }
-
- @Override
- protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
- return new XMLReader<T>(this);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return false;
- }
-
- @Override
- public void validate() {
- super.validate();
- Preconditions.checkNotNull(
- rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
- Preconditions.checkNotNull(
- recordElement,
- "recordElement is null. Use builder method withRecordElement() to set this.");
- Preconditions.checkNotNull(
- recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return JAXBCoder.of(recordClass);
- }
-
- public String getRootElement() {
- return rootElement;
- }
-
- public String getRecordElement() {
- return recordElement;
- }
-
- public Class<T> getRecordClass() {
- return recordClass;
- }
-
- /**
- * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
- * file should be of the form defined at {@link XmlSource}.
- *
- * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp
- * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
- *
- * @param <T> Type of objects that will be read by the reader.
- */
- private static class XMLReader<T> extends FileBasedReader<T> {
- // The amount of bytes read from the channel to memory when determining the starting offset of
- // the first record in a bundle. After matching to starting offset of the first record the
- // remaining bytes read to this buffer and the bytes still not read from the channel are used to
- // create the XML parser.
- private static final int BUF_SIZE = 1024;
-
- // This should be the maximum number of bytes a character will encode to, for any encoding
- // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be
- // four bytes.
- private static final int MAX_CHAR_BYTES = 4;
-
- // In order to support reading starting in the middle of an XML file, we construct an imaginary
- // well-formed document (a header and root tag followed by the contents of the input starting at
- // the record boundary) and feed it to the parser. Because of this, the offset reported by the
- // XML parser is not the same as offset in the original file. They differ by a constant amount:
- // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset;
- // Note that this is true only for files with single-byte characters.
- // It appears that, as of writing, there does not exist a Java XML parser capable of correctly
- // reporting byte offsets of elements in the presence of multi-byte characters.
- private long parserBaseOffset = 0;
- private boolean readingStarted = false;
-
- // If true, the current bundle does not contain any records.
- private boolean emptyBundle = false;
-
- private Unmarshaller jaxbUnmarshaller = null;
- private XMLStreamReader parser = null;
-
- private T currentRecord = null;
-
- // Byte offset of the current record in the XML file provided when creating the source.
- private long currentByteOffset = 0;
-
- public XMLReader(XmlSource<T> source) {
- super(source);
-
- // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
- try {
- JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
- jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
- // Throw errors if validation fails. JAXB by default ignores validation errors.
- jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
- @Override
- public boolean handleEvent(ValidationEvent event) {
- throw new RuntimeException(event.getMessage(), event.getLinkedException());
- }
- });
- } catch (JAXBException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public synchronized XmlSource<T> getCurrentSource() {
- return (XmlSource<T>) super.getCurrentSource();
- }
-
- @Override
- protected void startReading(ReadableByteChannel channel) throws IOException {
- // This method determines the correct starting offset of the first record by reading bytes
- // from the ReadableByteChannel. This implementation does not need the channel to be a
- // SeekableByteChannel.
- // The method tries to determine the first record element in the byte channel. The first
- // record must start with the characters "<recordElement" where "recordElement" is the
- // record element of the XML document described above. For the match to be complete this
- // has to be followed by one of following.
- // * any whitespace character
- // * '>' character
- // * '/' character (to support empty records).
- //
- // After this match this method creates the XML parser for parsing the XML document,
- // feeding it a fake document consisting of an XML header and the <rootElement> tag followed
- // by the contents of channel starting from <recordElement. The <rootElement> tag may be never
- // closed.
-
- // This stores any bytes that should be used prior to the remaining bytes of the channel when
- // creating an XML parser object.
- ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
- // A dummy declaration and root for the document with proper XML version and encoding. Without
- // this XML parsing may fail or may produce incorrect results.
-
- byte[] dummyStartDocumentBytes =
- ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
- + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
- preambleByteBuffer.write(dummyStartDocumentBytes);
- // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
- // method returns the offset and stores any bytes that should be used when creating the XML
- // parser in preambleByteBuffer.
- long offsetInFileOfRecordElement =
- getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
- if (offsetInFileOfRecordElement < 0) {
- // Bundle has no records. So marking this bundle as an empty bundle.
- emptyBundle = true;
- return;
- } else {
- byte[] preambleBytes = preambleByteBuffer.toByteArray();
- currentByteOffset = offsetInFileOfRecordElement;
- setUpXMLParser(channel, preambleBytes);
- parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length;
- }
- readingStarted = true;
- }
-
- // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts
- // any bytes read past the starting offset of the next record back to the preambleByteBuffer.
- // If a record is found, returns the starting offset of the record, otherwise
- // returns -1.
- private long getFirstOccurenceOfRecordElement(
- ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
- int byteIndexInRecordElementToMatch = 0;
- // Index of the byte in the string "<recordElement" to be matched
- // against the current byte from the stream.
- boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
- // next character to confirm if this is a positive match.
- boolean fullyMatched = false; // If true, record element was fully matched.
-
- // This gives the offset of the byte currently being read. We do a '-1' here since we
- // increment this value at the beginning of the while loop below.
- long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
- long startingOffsetInFileOfCurrentMatch = -1;
- // If this is non-negative, currently there is a match in progress and this value gives the
- // starting offset of the match currently being conducted.
- boolean matchStarted = false; // If true, a match is currently in progress.
-
- // These two values are used to determine the character immediately following a match for
- // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
- byte[] charBytes = new byte[MAX_CHAR_BYTES];
- int charBytesFound = 0;
-
- ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
- byte[] recordStartBytes =
- ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
-
- outer: while (channel.read(buf) > 0) {
- buf.flip();
- while (buf.hasRemaining()) {
- offsetInFileOfCurrentByte++;
- byte b = buf.get();
- boolean reset = false;
- if (recordStartBytesMatched) {
- // We already matched "<recordElement" reading the next character to determine if this
- // is a positive match for a new record.
- charBytes[charBytesFound] = b;
- charBytesFound++;
- Character c = null;
- if (charBytesFound == charBytes.length) {
- CharBuffer charBuf = CharBuffer.allocate(1);
- InputStream charBufStream = new ByteArrayInputStream(charBytes);
- java.io.Reader reader =
- new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
- int read = reader.read();
- if (read <= 0) {
- return -1;
- }
- charBuf.flip();
- c = (char) read;
- } else {
- continue;
- }
-
- // Record start may be of following forms
- // * "<recordElement<whitespace>..."
- // * "<recordElement>..."
- // * "<recordElement/..."
- if (Character.isWhitespace(c) || c == '>' || c == '/') {
- fullyMatched = true;
- // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
- // already read from the channel.
- preambleByteBuffer.write(recordStartBytes);
- preambleByteBuffer.write(charBytes);
- // Also add the rest of the current buffer to preambleByteBuffer.
- while (buf.hasRemaining()) {
- preambleByteBuffer.write(buf.get());
- }
- break outer;
- } else {
- // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
- ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
- newbuf.put(charBytes);
- offsetInFileOfCurrentByte -= charBytes.length;
- while (buf.hasRemaining()) {
- newbuf.put(buf.get());
- }
- newbuf.flip();
- buf = newbuf;
-
- // Ignore everything and try again starting from the current buffer.
- reset = true;
- }
- } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
- // Next byte matched.
- if (!matchStarted) {
- // Match was for the first byte, record the starting offset.
- matchStarted = true;
- startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
- }
- byteIndexInRecordElementToMatch++;
- } else {
- // Not a match. Ignore everything and try again starting at current point.
- reset = true;
- }
- if (reset) {
- // Clear variables and try to match starting from the next byte.
- byteIndexInRecordElementToMatch = 0;
- startingOffsetInFileOfCurrentMatch = -1;
- matchStarted = false;
- recordStartBytesMatched = false;
- charBytes = new byte[MAX_CHAR_BYTES];
- charBytesFound = 0;
- }
- if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
- // "<recordElement" matched. Need to still check next byte since this might be an
- // element that has "recordElement" as a prefix.
- recordStartBytesMatched = true;
- }
- }
- buf.clear();
- }
-
- if (!fullyMatched) {
- return -1;
- } else {
- return startingOffsetInFileOfCurrentMatch;
- }
- }
-
- private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException {
- try {
- // We use Woodstox because the StAX implementation provided by OpenJDK reports
- // character locations incorrectly. Note that Woodstox still currently reports *byte*
- // locations incorrectly when parsing documents that contain multi-byte characters.
- XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance();
- this.parser = xmlInputFactory.createXMLStreamReader(
- new SequenceInputStream(
- new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
- "UTF-8");
-
- // Current offset should be the offset before reading the record element.
- while (true) {
- int event = parser.next();
- if (event == XMLStreamConstants.START_ELEMENT) {
- String localName = parser.getLocalName();
- if (localName.equals(getCurrentSource().recordElement)) {
- break;
- }
- }
- }
- } catch (FactoryConfigurationError | XMLStreamException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- protected boolean readNextRecord() throws IOException {
- if (emptyBundle) {
- currentByteOffset = Long.MAX_VALUE;
- return false;
- }
- try {
- // Update current offset and check if the next value is the record element.
- currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
- while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
- parser.next();
- currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
- if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
- currentByteOffset = Long.MAX_VALUE;
- return false;
- }
- }
- JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
- currentRecord = jb.getValue();
- return true;
- } catch (JAXBException | XMLStreamException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (!readingStarted) {
- throw new NoSuchElementException();
- }
- return currentRecord;
- }
-
- @Override
- protected boolean isAtSplitPoint() {
- // Every record is at a split point.
- return true;
- }
-
- @Override
- protected long getCurrentOffset() {
- return currentByteOffset;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 7d59b09..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,987 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.io.range.ByteKey;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
-import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table")
- * .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- * BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
- private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
- /**
- * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
- * initialized with a
- * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
- * specifies which table to read. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}.
- */
- @Experimental
- public static Read read() {
- return new Read(null, "", null, null);
- }
-
- /**
- * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
- * initialized with a
- * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
- * specifies which table to write.
- */
- @Experimental
- public static Write write() {
- return new Write(null, "", null);
- }
-
- /**
- * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Read extends PTransform<PBegin, PCollection<Row>> {
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Read}.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Read(optionsWithAgent, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
- * using the given row filter.
- *
- * <p>Does not modify this object.
- */
- public Read withRowFilter(RowFilter filter) {
- checkNotNull(filter, "filter");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the specified table.
- *
- * <p>Does not modify this object.
- */
- public Read withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being read from.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PCollection<Row> apply(PBegin input) {
- BigtableSource source =
- new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
- return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
- }
-
- @Override
- public void validate(PBegin input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Read.class)
- .add("options", options)
- .add("tableId", tableId)
- .add("filter", filter)
- .toString();
- }
-
- /////////////////////////////////////////////////////////////////////////////////////////
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final RowFilter filter;
- @Nullable private final BigtableService bigtableService;
-
- private Read(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable RowFilter filter,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.filter = filter;
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Read withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- /**
- * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Write
- extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final BigtableService bigtableService;
-
- private Write(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Write}.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Write(optionsWithAgent, tableId, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Write(options, tableId, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being written to.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- Sink sink = new Sink(tableId, getBigtableService());
- return input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink));
- }
-
- @Override
- public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Write withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Write(options, tableId, bigtableService);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Write.class)
- .add("options", options)
- .add("tableId", tableId)
- .toString();
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////
- /** Disallow construction of utility class. */
- private BigtableIO() {}
-
- static class BigtableSource extends BoundedSource<Row> {
- public BigtableSource(
- BigtableService service,
- String tableId,
- @Nullable RowFilter filter,
- ByteKeyRange range,
- Long estimatedSizeBytes) {
- this.service = service;
- this.tableId = tableId;
- this.filter = filter;
- this.range = range;
- this.estimatedSizeBytes = estimatedSizeBytes;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(BigtableSource.class)
- .add("tableId", tableId)
- .add("filter", filter)
- .add("range", range)
- .add("estimatedSizeBytes", estimatedSizeBytes)
- .toString();
- }
-
- ////// Private state and internal implementation details //////
- private final BigtableService service;
- @Nullable private final String tableId;
- @Nullable private final RowFilter filter;
- private final ByteKeyRange range;
- @Nullable private Long estimatedSizeBytes;
- @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
- protected BigtableSource withStartKey(ByteKey startKey) {
- checkNotNull(startKey, "startKey");
- return new BigtableSource(
- service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEndKey(ByteKey endKey) {
- checkNotNull(endKey, "endKey");
- return new BigtableSource(
- service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
- checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
- return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
- }
-
- /**
- * Makes an API call to the Cloud Bigtable service that gives information about tablet key
- * boundaries and estimated sizes. We can use these samples to ensure that splits are on
- * different tablets, and possibly generate sub-splits within tablets.
- */
- private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
- return service.getSampleRowKeys(this);
- }
-
- @Override
- public List<BigtableSource> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Update the desiredBundleSizeBytes in order to limit the
- // number of splits to maximumNumberOfSplits.
- long maximumNumberOfSplits = 4000;
- long sizeEstimate = getEstimatedSizeBytes(options);
- desiredBundleSizeBytes =
- Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
- // Delegate to testable helper.
- return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
- }
-
- /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
- private List<BigtableSource> splitIntoBundlesBasedOnSamples(
- long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
- // There are no regions, or no samples available. Just scan the entire range.
- if (sampleRowKeys.isEmpty()) {
- logger.info("Not splitting source {} because no sample row keys are available.", this);
- return Collections.singletonList(this);
- }
-
- logger.info(
- "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
- desiredBundleSizeBytes,
- sampleRowKeys.size(),
- sampleRowKeys.get(0));
-
- // Loop through all sampled responses and generate splits from the ones that overlap the
- // scan range. The main complication is that we must track the end range of the previous
- // sample to generate good ranges.
- ByteKey lastEndKey = ByteKey.EMPTY;
- long lastOffset = 0;
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- for (SampleRowKeysResponse response : sampleRowKeys) {
- ByteKey responseEndKey = ByteKey.of(response.getRowKey());
- long responseOffset = response.getOffsetBytes();
- checkState(
- responseOffset >= lastOffset,
- "Expected response byte offset %s to come after the last offset %s",
- responseOffset,
- lastOffset);
-
- if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
- // This region does not overlap the scan, so skip it.
- lastOffset = responseOffset;
- lastEndKey = responseEndKey;
- continue;
- }
-
- // Calculate the beginning of the split as the larger of startKey and the end of the last
- // split. Unspecified start is smallest key so is correctly treated as earliest key.
- ByteKey splitStartKey = lastEndKey;
- if (splitStartKey.compareTo(range.getStartKey()) < 0) {
- splitStartKey = range.getStartKey();
- }
-
- // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
- // that range.containsKey handles the case when range.getEndKey() is empty.
- ByteKey splitEndKey = responseEndKey;
- if (!range.containsKey(splitEndKey)) {
- splitEndKey = range.getEndKey();
- }
-
- // We know this region overlaps the desired key range, and we know a rough estimate of its
- // size. Split the key range into bundle-sized chunks and then add them all as splits.
- long sampleSizeBytes = responseOffset - lastOffset;
- List<BigtableSource> subSplits =
- splitKeyRangeIntoBundleSizedSubranges(
- sampleSizeBytes,
- desiredBundleSizeBytes,
- ByteKeyRange.of(splitStartKey, splitEndKey));
- splits.addAll(subSplits);
-
- // Move to the next region.
- lastEndKey = responseEndKey;
- lastOffset = responseOffset;
- }
-
- // We must add one more region after the end of the samples if both these conditions hold:
- // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
- // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
- if (!lastEndKey.isEmpty()
- && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
- splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
- }
-
- List<BigtableSource> ret = splits.build();
- logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
- return ret;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
- // Delegate to testable helper.
- if (estimatedSizeBytes == null) {
- estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Computes the estimated size in bytes based on the total size of all samples that overlap
- * the key range this source will scan.
- */
- private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
- long estimatedSizeBytes = 0;
- long lastOffset = 0;
- ByteKey currentStartKey = ByteKey.EMPTY;
- // Compute the total estimated size as the size of each sample that overlaps the scan range.
- // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
- // filter or to sample on a given key range.
- for (SampleRowKeysResponse response : samples) {
- ByteKey currentEndKey = ByteKey.of(response.getRowKey());
- long currentOffset = response.getOffsetBytes();
- if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
- // Skip an empty region.
- lastOffset = currentOffset;
- continue;
- } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
- estimatedSizeBytes += currentOffset - lastOffset;
- }
- currentStartKey = currentEndKey;
- lastOffset = currentOffset;
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Cloud Bigtable returns query results ordered by key.
- */
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
- return new BigtableReader(this, service);
- }
-
- @Override
- public void validate() {
- checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
- }
-
- @Override
- public Coder<Row> getDefaultOutputCoder() {
- return Proto2Coder.of(Row.class);
- }
-
- /** Helper that splits the specified range in this source into bundles. */
- private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
- long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
- // Catch the trivial cases. Split is small enough already, or this is the last region.
- logger.debug(
- "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
- sampleSizeBytes,
- desiredBundleSizeBytes);
- if (sampleSizeBytes <= desiredBundleSizeBytes) {
- return Collections.singletonList(
- this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
- }
-
- checkArgument(
- sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
- checkArgument(
- desiredBundleSizeBytes > 0,
- "Desired bundle size %s bytes must be greater than 0.",
- desiredBundleSizeBytes);
-
- int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
- List<ByteKey> splitKeys = range.split(splitCount);
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- Iterator<ByteKey> keys = splitKeys.iterator();
- ByteKey prev = keys.next();
- while (keys.hasNext()) {
- ByteKey next = keys.next();
- splits.add(
- this
- .withStartKey(prev)
- .withEndKey(next)
- .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
- prev = next;
- }
- return splits.build();
- }
-
- public ByteKeyRange getRange() {
- return range;
- }
-
- public RowFilter getRowFilter() {
- return filter;
- }
-
- public String getTableId() {
- return tableId;
- }
- }
-
- private static class BigtableReader extends BoundedReader<Row> {
- // Thread-safety: source is protected via synchronization and is only accessed or modified
- // inside a synchronized block (or constructor, which is the same).
- private BigtableSource source;
- private BigtableService service;
- private BigtableService.Reader reader;
- private final ByteKeyRangeTracker rangeTracker;
- private long recordsReturned;
-
- public BigtableReader(BigtableSource source, BigtableService service) {
- this.source = source;
- this.service = service;
- rangeTracker = ByteKeyRangeTracker.of(source.getRange());
- }
-
- @Override
- public boolean start() throws IOException {
- reader = service.createReader(getCurrentSource());
- boolean hasRecord =
- reader.start()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public synchronized BigtableSource getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean advance() throws IOException {
- boolean hasRecord =
- reader.advance()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public Row getCurrent() throws NoSuchElementException {
- return reader.getCurrentRow();
- }
-
- @Override
- public void close() throws IOException {
- logger.info("Closing reader after reading {} records.", recordsReturned);
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- @Override
- public final Double getFractionConsumed() {
- return rangeTracker.getFractionConsumed();
- }
-
- @Override
- public final synchronized BigtableSource splitAtFraction(double fraction) {
- ByteKey splitKey;
- try {
- splitKey = source.getRange().interpolateKey(fraction);
- } catch (IllegalArgumentException e) {
- logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
- return null;
- }
- logger.debug(
- "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
- if (!rangeTracker.trySplitAtPosition(splitKey)) {
- return null;
- }
- BigtableSource primary = source.withEndKey(splitKey);
- BigtableSource residual = source.withStartKey(splitKey);
- this.source = primary;
- return residual;
- }
- }
-
- private static class Sink
- extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
- public Sink(String tableId, BigtableService bigtableService) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = checkNotNull(bigtableService, "bigtableService");
- }
-
- public String getTableId() {
- return tableId;
- }
-
- public BigtableService getBigtableService() {
- return bigtableService;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Sink.class)
- .add("bigtableService", bigtableService)
- .add("tableId", tableId)
- .toString();
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- private final String tableId;
- private final BigtableService bigtableService;
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
- PipelineOptions options) {
- return new BigtableWriteOperation(this);
- }
-
- /** Does nothing, as it is redundant with {@link Write#validate}. */
- @Override
- public void validate(PipelineOptions options) {}
- }
-
- private static class BigtableWriteOperation
- extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
- private final Sink sink;
-
- public BigtableWriteOperation(Sink sink) {
- this.sink = sink;
- }
-
- @Override
- public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
- throws Exception {
- return new BigtableWriter(this);
- }
-
- @Override
- public void initialize(PipelineOptions options) {}
-
- @Override
- public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
- long count = 0;
- for (Long value : writerResults) {
- value += count;
- }
- logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
- }
-
- @Override
- public Sink getSink() {
- return sink;
- }
-
- @Override
- public Coder<Long> getWriterResultCoder() {
- return VarLongCoder.of();
- }
- }
-
- private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
- private final BigtableWriteOperation writeOperation;
- private final Sink sink;
- private BigtableService.Writer bigtableWriter;
- private long recordsWritten;
- private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
- public BigtableWriter(BigtableWriteOperation writeOperation) {
- this.writeOperation = writeOperation;
- this.sink = writeOperation.getSink();
- this.failures = new ConcurrentLinkedQueue<>();
- }
-
- @Override
- public void open(String uId) throws Exception {
- bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
- recordsWritten = 0;
- }
-
- /**
- * If any write has asynchronously failed, fail the bundle with a useful error.
- */
- private void checkForFailures() throws IOException {
- // Note that this function is never called by multiple threads and is the only place that
- // we remove from failures, so this code is safe.
- if (failures.isEmpty()) {
- return;
- }
-
- StringBuilder logEntry = new StringBuilder();
- int i = 0;
- for (; i < 10 && !failures.isEmpty(); ++i) {
- BigtableWriteException exc = failures.remove();
- logEntry.append("\n").append(exc.getMessage());
- if (exc.getCause() != null) {
- logEntry.append(": ").append(exc.getCause().getMessage());
- }
- }
- String message =
- String.format(
- "At least %d errors occurred writing to Bigtable. First %d errors: %s",
- i + failures.size(),
- i,
- logEntry.toString());
- logger.error(message);
- throw new IOException(message);
- }
-
- @Override
- public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
- checkForFailures();
- Futures.addCallback(
- bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
- ++recordsWritten;
- }
-
- @Override
- public Long close() throws Exception {
- bigtableWriter.close();
- bigtableWriter = null;
- checkForFailures();
- logger.info("Wrote {} records", recordsWritten);
- return recordsWritten;
- }
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
- return writeOperation;
- }
-
- private class WriteExceptionCallback implements FutureCallback<Empty> {
- private final KV<ByteString, Iterable<Mutation>> value;
-
- public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
- this.value = value;
- }
-
- @Override
- public void onFailure(Throwable cause) {
- failures.add(new BigtableWriteException(value, cause));
- }
-
- @Override
- public void onSuccess(Empty produced) {}
- }
- }
-
- /**
- * An exception that puts information about the failed record being written in its message.
- */
- static class BigtableWriteException extends IOException {
- public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
- super(
- String.format(
- "Error mutating row %s with mutations %s",
- record.getKey().toStringUtf8(),
- record.getValue()),
- cause);
- }
- }
-
- /**
- * A helper function to produce a Cloud Bigtable user agent string.
- */
- private static String getUserAgent() {
- String javaVersion = System.getProperty("java.specification.version");
- DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo();
- return String.format(
- "%s/%s (%s); %s",
- info.getName(),
- info.getVersion(),
- javaVersion,
- "0.2.3" /* TODO get Bigtable client version directly from jar. */);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index 85d706c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.io.bigtable;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
- /**
- * The interface of a class that can write to Cloud Bigtable.
- */
- interface Writer {
- /**
- * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
- * row key to be mutated and the iterable of mutations represent the changes to be made to the
- * row.
- *
- * @throws IOException if there is an error submitting the write.
- */
- ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException;
-
- /**
- * Closes the writer.
- *
- * @throws IOException if any writes did not succeed
- */
- void close() throws IOException;
- }
-
- /**
- * The interface of a class that reads from Cloud Bigtable.
- */
- interface Reader {
- /**
- * Reads the first element (including initialization, such as opening a network connection) and
- * returns true if an element was found.
- */
- boolean start() throws IOException;
-
- /**
- * Attempts to read the next element, and returns true if an element has been read.
- */
- boolean advance() throws IOException;
-
- /**
- * Closes the reader.
- *
- * @throws IOException if there is an error.
- */
- void close() throws IOException;
-
- /**
- * Returns the last row read by a successful start() or advance(), or throws if there is no
- * current row because the last such call was unsuccessful.
- */
- Row getCurrentRow() throws NoSuchElementException;
- }
-
- /**
- * Returns {@code true} if the table with the give name exists.
- */
- boolean tableExists(String tableId) throws IOException;
-
- /**
- * Returns a {@link Reader} that will read from the specified source.
- */
- Reader createReader(BigtableSource source) throws IOException;
-
- /**
- * Returns a {@link Writer} that will write to the specified table.
- */
- Writer openForWriting(String tableId) throws IOException;
-
- /**
- * Returns a set of row keys sampled from the underlying table. These contain information about
- * the distribution of keys within the table.
- */
- List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}