You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/25 21:53:17 UTC

[2/3] beam git commit: Fix code style issues for HBaseIO

Fix code style issues for HBaseIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5bdedd2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5bdedd2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5bdedd2

Branch: refs/heads/master
Commit: e5bdedd23208e484f6852eda44c59fb873645e8f
Parents: cdf050c
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Fri Aug 25 10:43:17 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 23:52:22 2017 +0200

----------------------------------------------------------------------
 .../io/hbase/HBaseCoderProviderRegistrar.java   |    8 +-
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 1090 +++++++++---------
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |   27 +-
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |    6 +-
 .../beam/sdk/io/hbase/SerializableScan.java     |   37 +-
 .../hbase/HBaseCoderProviderRegistrarTest.java  |    4 +-
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  814 +++++++------
 .../sdk/io/hbase/HBaseMutationCoderTest.java    |    4 +-
 .../beam/sdk/io/hbase/HBaseResultCoderTest.java |    4 +-
 .../beam/sdk/io/hbase/SerializableScanTest.java |    6 +-
 10 files changed, 987 insertions(+), 1013 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
index 2973d1b..f836ebe 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -26,15 +26,13 @@ import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.hbase.client.Result;
 
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
- */
+/** A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. */
 @AutoService(CoderProviderRegistrar.class)
 public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
   @Override
   public List<CoderProvider> getCoderProviders() {
     return ImmutableList.of(
-      HBaseMutationCoder.getCoderProvider(),
-      CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+        HBaseMutationCoder.getCoderProvider(),
+        CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 7f58cef..41ced93 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -71,19 +71,19 @@ import org.slf4j.LoggerFactory;
 /**
  * A bounded source and sink for HBase.
  *
- * <p>For more information, see the online documentation at
- * <a href="https://hbase.apache.org/">HBase</a>.
+ * <p>For more information, see the online documentation at <a
+ * href="https://hbase.apache.org/">HBase</a>.
  *
  * <h3>Reading from HBase</h3>
  *
- * <p>The HBase source returns a set of rows from a single table, returning a
- * {@code PCollection<Result>}.
+ * <p>The HBase source returns a set of rows from a single table, returning a {@code
+ * PCollection<Result>}.
  *
- * <p>To configure a HBase source, you must supply a table id and a {@link Configuration}
- * to identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the
- * table. The row range to be read can optionally be restricted using with a {@link Scan} object
- * or using the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using
- * {@link HBaseIO.Read#withFilter}, for example:
+ * <p>To configure a HBase source, you must supply a table id and a {@link Configuration} to
+ * identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the table.
+ * The row range to be read can optionally be restricted using with a {@link Scan} object or using
+ * the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using {@link
+ * HBaseIO.Read#withFilter}, for example:
  *
  * <pre>{@code
  * // Scan the entire table.
@@ -118,12 +118,12 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Writing to HBase</h3>
  *
- * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an
- * idempotent transformation on a row.
+ * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a {@link
+ * PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an idempotent
+ * transformation on a row.
  *
- * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
- * to identify the HBase instance, for example:
+ * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} to identify
+ * the HBase instance, for example:
  *
  * <pre>{@code
  * Configuration configuration = ...;
@@ -137,605 +137,605 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Experimental</h3>
  *
- * <p>The design of the API for HBaseIO is currently related to the BigtableIO one,
- * it can evolve or be different in some aspects, but the idea is that users can easily migrate
- * from one to the other</p>.
+ * <p>The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or
+ * be different in some aspects, but the idea is that users can easily migrate from one to the other
+ * .
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class HBaseIO {
-    private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
-
-    /** Disallow construction of utility class. */
-    private HBaseIO() {
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
+
+  /** Disallow construction of utility class. */
+  private HBaseIO() {}
+
+  /**
+   * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be initialized
+   * with a {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies the HBase instance,
+   * and a {@link HBaseIO.Read#withTableId tableId} that specifies which table to read. A {@link
+   * Filter} may also optionally be specified using {@link HBaseIO.Read#withFilter}.
+   */
+  @Experimental
+  public static Read read() {
+    return new Read(null, "", new SerializableScan(new Scan()));
+  }
+
+  /**
+   * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for
+   * more information.
+   *
+   * @see HBaseIO
+   */
+  public static class Read extends PTransform<PBegin, PCollection<Result>> {
+    /**
+     * Returns a new {@link HBaseIO.Read} that will read from the HBase instance indicated by the
+     * given configuration.
+     */
+    public Read withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "conf");
+      return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
     }
 
     /**
-     * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be
-     * initialized with a
-     * {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies
-     * the HBase instance, and a {@link HBaseIO.Read#withTableId tableId} that
-     * specifies which table to read. A {@link Filter} may also optionally be specified using
-     * {@link HBaseIO.Read#withFilter}.
+     * Returns a new {@link HBaseIO.Read} that will read from the specified table.
+     *
+     * <p>Does not modify this object.
      */
-    @Experimental
-    public static Read read() {
-        return new Read(null, "", new SerializableScan(new Scan()));
+    public Read withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Read(serializableConfiguration, tableId, serializableScan);
     }
 
     /**
-     * A {@link PTransform} that reads from HBase. See the class-level Javadoc on
-     * {@link HBaseIO} for more information.
+     * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
+     * scan.
      *
-     * @see HBaseIO
+     * <p>Does not modify this object.
      */
-    public static class Read extends PTransform<PBegin, PCollection<Result>> {
-        /**
-         * Returns a new {@link HBaseIO.Read} that will read from the HBase instance
-         * indicated by the given configuration.
-         */
-        public Read withConfiguration(Configuration configuration) {
-            checkNotNull(configuration, "conf");
-            return new Read(new SerializableConfiguration(configuration),
-                    tableId, serializableScan);
-        }
+    public Read withScan(Scan scan) {
+      checkNotNull(scan, "scan");
+      return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
+    }
 
-        /**
-         * Returns a new {@link HBaseIO.Read} that will read from the specified table.
-         *
-         * <p>Does not modify this object.
-         */
-        public Read withTableId(String tableId) {
-            checkNotNull(tableId, "tableId");
-            return new Read(serializableConfiguration, tableId, serializableScan);
-        }
+    /**
+     * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
+     * row filter.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withFilter(Filter filter) {
+      checkNotNull(filter, "filter");
+      return withScan(serializableScan.get().setFilter(filter));
+    }
 
-        /**
-         * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase
-         * using the given scan.
-         *
-         * <p>Does not modify this object.
-         */
-        public Read withScan(Scan scan) {
-            checkNotNull(scan, "scan");
-            return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
-        }
+    /**
+     * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withKeyRange(ByteKeyRange keyRange) {
+      checkNotNull(keyRange, "keyRange");
+      byte[] startRow = keyRange.getStartKey().getBytes();
+      byte[] stopRow = keyRange.getEndKey().getBytes();
+      return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
+    }
 
-        /**
-         * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase
-         * using the given row filter.
-         *
-         * <p>Does not modify this object.
-         */
-        public Read withFilter(Filter filter) {
-            checkNotNull(filter, "filter");
-            return withScan(serializableScan.get().setFilter(filter));
-        }
+    /**
+     * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+      checkNotNull(startRow, "startRow");
+      checkNotNull(stopRow, "stopRow");
+      ByteKeyRange keyRange =
+          ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+      return withKeyRange(keyRange);
+    }
 
-        /**
-         * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
-         *
-         * <p>Does not modify this object.
-         */
-        public Read withKeyRange(ByteKeyRange keyRange) {
-            checkNotNull(keyRange, "keyRange");
-            byte[] startRow = keyRange.getStartKey().getBytes();
-            byte[] stopRow = keyRange.getEndKey().getBytes();
-            return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
-        }
+    private Read(
+        SerializableConfiguration serializableConfiguration,
+        String tableId,
+        SerializableScan serializableScan) {
+      this.serializableConfiguration = serializableConfiguration;
+      this.tableId = tableId;
+      this.serializableScan = serializableScan;
+    }
 
-        /**
-         * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
-         *
-         * <p>Does not modify this object.
-         */
-        public Read withKeyRange(byte[] startRow, byte[] stopRow) {
-            checkNotNull(startRow, "startRow");
-            checkNotNull(stopRow, "stopRow");
-            ByteKeyRange keyRange =
-                    ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
-            return withKeyRange(keyRange);
-        }
+    @Override
+    public PCollection<Result> expand(PBegin input) {
+      HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
+      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+    }
 
-        private Read(SerializableConfiguration serializableConfiguration, String tableId,
-                     SerializableScan serializableScan) {
-            this.serializableConfiguration = serializableConfiguration;
-            this.tableId = tableId;
-            this.serializableScan = serializableScan;
-        }
+    @Override
+    public void validate(PipelineOptions options) {
+      checkArgument(serializableConfiguration != null, "Configuration not provided");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try (Connection connection =
+          ConnectionFactory.createConnection(serializableConfiguration.get())) {
+        Admin admin = connection.getAdmin();
+        checkArgument(
+            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
 
-        @Override
-        public PCollection<Result> expand(PBegin input) {
-            HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
-            return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
-        }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+      builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString()));
+    }
 
-        @Override
-        public void validate(PipelineOptions options) {
-            checkArgument(serializableConfiguration != null,
-                    "Configuration not provided");
-            checkArgument(!tableId.isEmpty(), "Table ID not specified");
-            try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.get())) {
-                Admin admin = connection.getAdmin();
-                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
-                        "Table %s does not exist", tableId);
-            } catch (IOException e) {
-                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-            }
-        }
+    public String getTableId() {
+      return tableId;
+    }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-            super.populateDisplayData(builder);
-            builder.add(DisplayData.item("configuration",
-                    serializableConfiguration.get().toString()));
-            builder.add(DisplayData.item("tableId", tableId));
-            builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString()));
-        }
+    public Configuration getConfiguration() {
+      return serializableConfiguration.get();
+    }
 
-        public String getTableId() {
-            return tableId;
-        }
+    /** Returns the range of keys that will be read from the table. */
+    public ByteKeyRange getKeyRange() {
+      byte[] startRow = serializableScan.get().getStartRow();
+      byte[] stopRow = serializableScan.get().getStopRow();
+      return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+    }
 
-        public Configuration getConfiguration() {
-            return serializableConfiguration.get();
-        }
+    private final SerializableConfiguration serializableConfiguration;
+    private final String tableId;
+    private final SerializableScan serializableScan;
+  }
 
-        /**
-         * Returns the range of keys that will be read from the table.
-         */
-        public ByteKeyRange getKeyRange() {
-            byte[] startRow = serializableScan.get().getStartRow();
-            byte[] stopRow = serializableScan.get().getStopRow();
-            return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
-        }
+  static class HBaseSource extends BoundedSource<Result> {
+    private final Read read;
+    @Nullable private Long estimatedSizeBytes;
 
-        private final SerializableConfiguration serializableConfiguration;
-        private final String tableId;
-        private final SerializableScan serializableScan;
+    HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
+      this.read = read;
+      this.estimatedSizeBytes = estimatedSizeBytes;
     }
 
-    static class HBaseSource extends BoundedSource<Result> {
-        private final Read read;
-        @Nullable private Long estimatedSizeBytes;
-
-        HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
-            this.read = read;
-            this.estimatedSizeBytes = estimatedSizeBytes;
-        }
+    HBaseSource withStartKey(ByteKey startKey) throws IOException {
+      checkNotNull(startKey, "startKey");
+      Read newRead =
+          new Read(
+              read.serializableConfiguration,
+              read.tableId,
+              new SerializableScan(
+                  new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
+      return new HBaseSource(newRead, estimatedSizeBytes);
+    }
 
-        HBaseSource withStartKey(ByteKey startKey) throws IOException {
-            checkNotNull(startKey, "startKey");
-            Read newRead = new Read(read.serializableConfiguration, read.tableId,
-                new SerializableScan(
-                    new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
-            return new HBaseSource(newRead, estimatedSizeBytes);
-        }
+    HBaseSource withEndKey(ByteKey endKey) throws IOException {
+      checkNotNull(endKey, "endKey");
+      Read newRead =
+          new Read(
+              read.serializableConfiguration,
+              read.tableId,
+              new SerializableScan(
+                  new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
+      return new HBaseSource(newRead, estimatedSizeBytes);
+    }
 
-        HBaseSource withEndKey(ByteKey endKey) throws IOException {
-            checkNotNull(endKey, "endKey");
-            Read newRead = new Read(read.serializableConfiguration, read.tableId,
-                new SerializableScan(
-                    new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
-            return new HBaseSource(newRead, estimatedSizeBytes);
-        }
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
+      if (estimatedSizeBytes == null) {
+        estimatedSizeBytes = estimateSizeBytes();
+        LOG.debug(
+            "Estimated size {} bytes for table {} and scan {}",
+            estimatedSizeBytes,
+            read.tableId,
+            read.serializableScan.get());
+      }
+      return estimatedSizeBytes;
+    }
 
-        @Override
-        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
-            if (estimatedSizeBytes == null) {
-                estimatedSizeBytes = estimateSizeBytes();
-                LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes,
-                        read.tableId, read.serializableScan.get());
+    /**
+     * This estimates the real size, it can be the compressed size depending on the HBase
+     * configuration.
+     */
+    private long estimateSizeBytes() throws Exception {
+      // This code is based on RegionSizeCalculator in hbase-server
+      long estimatedSizeBytes = 0L;
+      Configuration configuration = this.read.serializableConfiguration.get();
+      try (Connection connection = ConnectionFactory.createConnection(configuration)) {
+        // filter regions for the given table/scan
+        List<HRegionLocation> regionLocations = getRegionLocations(connection);
+
+        // builds set of regions who are part of the table scan
+        Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+        for (HRegionLocation regionLocation : regionLocations) {
+          tableRegions.add(regionLocation.getRegionInfo().getRegionName());
+        }
+
+        // calculate estimated size for the regions
+        Admin admin = connection.getAdmin();
+        ClusterStatus clusterStatus = admin.getClusterStatus();
+        Collection<ServerName> servers = clusterStatus.getServers();
+        for (ServerName serverName : servers) {
+          ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+          for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+            byte[] regionId = regionLoad.getName();
+            if (tableRegions.contains(regionId)) {
+              long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L;
+              estimatedSizeBytes += regionSizeBytes;
             }
-            return estimatedSizeBytes;
+          }
         }
+      }
+      return estimatedSizeBytes;
+    }
 
-        /**
-         * This estimates the real size, it can be the compressed size depending on the HBase
-         * configuration.
-         */
-        private long estimateSizeBytes() throws Exception {
-            // This code is based on RegionSizeCalculator in hbase-server
-            long estimatedSizeBytes = 0L;
-            Configuration configuration = this.read.serializableConfiguration.get();
-            try (Connection connection = ConnectionFactory.createConnection(configuration)) {
-                // filter regions for the given table/scan
-                List<HRegionLocation> regionLocations = getRegionLocations(connection);
-
-                // builds set of regions who are part of the table scan
-                Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-                for (HRegionLocation regionLocation : regionLocations) {
-                    tableRegions.add(regionLocation.getRegionInfo().getRegionName());
-                }
-
-                // calculate estimated size for the regions
-                Admin admin = connection.getAdmin();
-                ClusterStatus clusterStatus = admin.getClusterStatus();
-                Collection<ServerName> servers = clusterStatus.getServers();
-                for (ServerName serverName : servers) {
-                    ServerLoad serverLoad = clusterStatus.getLoad(serverName);
-                    for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
-                        byte[] regionId = regionLoad.getName();
-                        if (tableRegions.contains(regionId)) {
-                            long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L;
-                            estimatedSizeBytes += regionSizeBytes;
-                        }
-                    }
-                }
-            }
-            return estimatedSizeBytes;
-        }
+    private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception {
+      final Scan scan = read.serializableScan.get();
+      byte[] startRow = scan.getStartRow();
+      byte[] stopRow = scan.getStopRow();
 
-        private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception {
-            final Scan scan = read.serializableScan.get();
-            byte[] startRow = scan.getStartRow();
-            byte[] stopRow = scan.getStopRow();
-
-            final List<HRegionLocation> regionLocations = new ArrayList<>();
-
-            final boolean scanWithNoLowerBound = startRow.length == 0;
-            final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-            TableName tableName = TableName.valueOf(read.tableId);
-            RegionLocator regionLocator = connection.getRegionLocator(tableName);
-            List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
-            for (HRegionLocation regionLocation : tableRegionInfos) {
-                final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
-                final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
-                boolean isLastRegion = endKey.length == 0;
-                // filters regions who are part of the scan
-                if ((scanWithNoLowerBound
-                        || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
-                        && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
-                    regionLocations.add(regionLocation);
-                }
-            }
+      final List<HRegionLocation> regionLocations = new ArrayList<>();
 
-            return regionLocations;
-        }
+      final boolean scanWithNoLowerBound = startRow.length == 0;
+      final boolean scanWithNoUpperBound = stopRow.length == 0;
 
-        private List<HBaseSource>
-            splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits)
-                throws Exception {
-            final Scan scan = read.serializableScan.get();
-            byte[] startRow = scan.getStartRow();
-            byte[] stopRow = scan.getStopRow();
-
-            final List<HBaseSource> sources = new ArrayList<>(numSplits);
-            final boolean scanWithNoLowerBound = startRow.length == 0;
-            final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-            for (HRegionLocation regionLocation : regionLocations) {
-                final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
-                final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
-                boolean isLastRegion = endKey.length == 0;
-                String host = regionLocation.getHostnamePort();
-
-                final byte[] splitStart = (scanWithNoLowerBound
-                        || Bytes.compareTo(startKey, startRow) >= 0) ? startKey : startRow;
-                final byte[] splitStop =
-                        (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
-                                && !isLastRegion ? endKey : stopRow;
-
-                LOG.debug("{} {} {} {} {}", sources.size(), host, read.tableId,
-                        Bytes.toString(splitStart), Bytes.toString(splitStop));
-
-                // We need to create a new copy of the scan and read to add the new ranges
-                Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
-                Read newRead = new Read(read.serializableConfiguration, read.tableId,
-                        new SerializableScan(newScan));
-                sources.add(new HBaseSource(newRead, estimatedSizeBytes));
-            }
-            return sources;
+      TableName tableName = TableName.valueOf(read.tableId);
+      RegionLocator regionLocator = connection.getRegionLocator(tableName);
+      List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+      for (HRegionLocation regionLocation : tableRegionInfos) {
+        final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+        final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+        boolean isLastRegion = endKey.length == 0;
+        // filters regions who are part of the scan
+        if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
+            && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+          regionLocations.add(regionLocation);
         }
+      }
+
+      return regionLocations;
+    }
+
+    private List<HBaseSource> splitBasedOnRegions(
+        List<HRegionLocation> regionLocations, int numSplits) throws Exception {
+      final Scan scan = read.serializableScan.get();
+      byte[] startRow = scan.getStartRow();
+      byte[] stopRow = scan.getStopRow();
+
+      final List<HBaseSource> sources = new ArrayList<>(numSplits);
+      final boolean scanWithNoLowerBound = startRow.length == 0;
+      final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+      for (HRegionLocation regionLocation : regionLocations) {
+        final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+        final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+        boolean isLastRegion = endKey.length == 0;
+        String host = regionLocation.getHostnamePort();
+
+        final byte[] splitStart =
+            (scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0)
+                ? startKey
+                : startRow;
+        final byte[] splitStop =
+            (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion
+                ? endKey
+                : stopRow;
+
+        LOG.debug(
+            "{} {} {} {} {}",
+            sources.size(),
+            host,
+            read.tableId,
+            Bytes.toString(splitStart),
+            Bytes.toString(splitStop));
+
+        // We need to create a new copy of the scan and read to add the new ranges
+        Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
+        Read newRead =
+            new Read(read.serializableConfiguration, read.tableId, new SerializableScan(newScan));
+        sources.add(new HBaseSource(newRead, estimatedSizeBytes));
+      }
+      return sources;
+    }
 
     @Override
     public List<? extends BoundedSource<Result>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-            LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
-            long estimatedSizeBytes = getEstimatedSizeBytes(options);
-            int numSplits = 1;
-            if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) {
-                numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
-            }
-
-            try (Connection connection = ConnectionFactory.createConnection(
-                    read.getConfiguration())) {
-                List<HRegionLocation> regionLocations = getRegionLocations(connection);
-                int realNumSplits =
-                        numSplits < regionLocations.size() ? regionLocations.size() : numSplits;
-                LOG.debug("Suggested {} bundle(s) based on size", numSplits);
-                LOG.debug("Suggested {} bundle(s) based on number of regions",
-                        regionLocations.size());
-                final List<HBaseSource> sources = splitBasedOnRegions(regionLocations,
-                        realNumSplits);
-                LOG.debug("Split into {} bundle(s)", sources.size());
-                if (numSplits >= 1) {
-                    return sources;
-                }
-                return Collections.singletonList(this);
-            }
-        }
-
-        @Override
-        public BoundedReader<Result> createReader(PipelineOptions pipelineOptions)
-                throws IOException {
-            return new HBaseReader(this);
-        }
-
-        @Override
-        public void validate() {
-            read.validate(null /* input */);
-        }
+      LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
+      long estimatedSizeBytes = getEstimatedSizeBytes(options);
+      int numSplits = 1;
+      if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) {
+        numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
+      }
+
+      try (Connection connection = ConnectionFactory.createConnection(read.getConfiguration())) {
+        List<HRegionLocation> regionLocations = getRegionLocations(connection);
+        int realNumSplits = numSplits < regionLocations.size() ? regionLocations.size() : numSplits;
+        LOG.debug("Suggested {} bundle(s) based on size", numSplits);
+        LOG.debug("Suggested {} bundle(s) based on number of regions", regionLocations.size());
+        final List<HBaseSource> sources = splitBasedOnRegions(regionLocations, realNumSplits);
+        LOG.debug("Split into {} bundle(s)", sources.size());
+        if (numSplits >= 1) {
+          return sources;
+        }
+        return Collections.singletonList(this);
+      }
+    }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-            read.populateDisplayData(builder);
-        }
+    @Override
+    public BoundedReader<Result> createReader(PipelineOptions pipelineOptions) throws IOException {
+      return new HBaseReader(this);
+    }
 
-        @Override
-        public Coder<Result> getOutputCoder() {
-            return HBaseResultCoder.of();
-        }
+    @Override
+    public void validate() {
+      read.validate(null /* input */);
     }
 
-    private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
-        private HBaseSource source;
-        private Connection connection;
-        private ResultScanner scanner;
-        private Iterator<Result> iter;
-        private Result current;
-        private final ByteKeyRangeTracker rangeTracker;
-        private long recordsReturned;
-
-        HBaseReader(HBaseSource source) {
-            this.source = source;
-            Scan scan = source.read.serializableScan.get();
-            ByteKeyRange range = ByteKeyRange
-                .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
-            rangeTracker = ByteKeyRangeTracker.of(range);
-        }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      read.populateDisplayData(builder);
+    }
 
-        @Override
-        public boolean start() throws IOException {
-            HBaseSource source = getCurrentSource();
-            Configuration configuration = source.read.serializableConfiguration.get();
-            String tableId = source.read.tableId;
-            connection = ConnectionFactory.createConnection(configuration);
-            TableName tableName = TableName.valueOf(tableId);
-            Table table = connection.getTable(tableName);
-            // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it.
-            Scan scanClone = new Scan(source.read.serializableScan.get());
-            scanner = table.getScanner(scanClone);
-            iter = scanner.iterator();
-            return advance();
-        }
+    @Override
+    public Coder<Result> getOutputCoder() {
+      return HBaseResultCoder.of();
+    }
+  }
+
+  private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
+    private HBaseSource source;
+    private Connection connection;
+    private ResultScanner scanner;
+    private Iterator<Result> iter;
+    private Result current;
+    private final ByteKeyRangeTracker rangeTracker;
+    private long recordsReturned;
+
+    HBaseReader(HBaseSource source) {
+      this.source = source;
+      Scan scan = source.read.serializableScan.get();
+      ByteKeyRange range =
+          ByteKeyRange.of(
+              ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
+      rangeTracker = ByteKeyRangeTracker.of(range);
+    }
 
-        @Override
-        public Result getCurrent() throws NoSuchElementException {
-            return current;
-        }
+    @Override
+    public boolean start() throws IOException {
+      HBaseSource source = getCurrentSource();
+      Configuration configuration = source.read.serializableConfiguration.get();
+      String tableId = source.read.tableId;
+      connection = ConnectionFactory.createConnection(configuration);
+      TableName tableName = TableName.valueOf(tableId);
+      Table table = connection.getTable(tableName);
+      // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it.
+      Scan scanClone = new Scan(source.read.serializableScan.get());
+      scanner = table.getScanner(scanClone);
+      iter = scanner.iterator();
+      return advance();
+    }
 
-        @Override
-        public boolean advance() throws IOException {
-            if (!iter.hasNext()) {
-                return rangeTracker.markDone();
-            }
-            final Result next = iter.next();
-            boolean hasRecord =
-                rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
-                || rangeTracker.markDone();
-            if (hasRecord) {
-                current = next;
-                ++recordsReturned;
-            }
-            return hasRecord;
-        }
+    @Override
+    public Result getCurrent() throws NoSuchElementException {
+      return current;
+    }
 
-        @Override
-        public void close() throws IOException {
-            LOG.debug("Closing reader after reading {} records.", recordsReturned);
-            if (scanner != null) {
-                scanner.close();
-                scanner = null;
-            }
-            if (connection != null) {
-                connection.close();
-                connection = null;
-            }
-        }
+    @Override
+    public boolean advance() throws IOException {
+      if (!iter.hasNext()) {
+        return rangeTracker.markDone();
+      }
+      final Result next = iter.next();
+      boolean hasRecord =
+          rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
+              || rangeTracker.markDone();
+      if (hasRecord) {
+        current = next;
+        ++recordsReturned;
+      }
+      return hasRecord;
+    }
 
-        @Override
-        public synchronized HBaseSource getCurrentSource() {
-            return source;
-        }
+    @Override
+    public void close() throws IOException {
+      LOG.debug("Closing reader after reading {} records.", recordsReturned);
+      if (scanner != null) {
+        scanner.close();
+        scanner = null;
+      }
+      if (connection != null) {
+        connection.close();
+        connection = null;
+      }
+    }
 
-        @Override
-        public final Double getFractionConsumed() {
-            return rangeTracker.getFractionConsumed();
-        }
+    @Override
+    public synchronized HBaseSource getCurrentSource() {
+      return source;
+    }
 
-        @Override
-        public final long getSplitPointsConsumed() {
-            return rangeTracker.getSplitPointsConsumed();
-        }
+    @Override
+    public final Double getFractionConsumed() {
+      return rangeTracker.getFractionConsumed();
+    }
 
-        @Override
-        @Nullable
-        public final synchronized HBaseSource splitAtFraction(double fraction) {
-            ByteKey splitKey;
-            try {
-                splitKey = rangeTracker.getRange().interpolateKey(fraction);
-            } catch (RuntimeException e) {
-                LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(),
-                    fraction, e);
-                return null;
-            }
-            LOG.info(
-                "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
-            HBaseSource primary;
-            HBaseSource residual;
-            try {
-                primary = source.withEndKey(splitKey);
-                residual = source.withStartKey(splitKey);
-            } catch (Exception e) {
-                LOG.info(
-                    "{}: Interpolating for fraction {} yielded invalid split key {}.",
-                    rangeTracker.getRange(),
-                    fraction,
-                    splitKey,
-                    e);
-                return null;
-            }
-            if (!rangeTracker.trySplitAtPosition(splitKey)) {
-                return null;
-            }
-            this.source = primary;
-            return residual;
-        }
+    @Override
+    public final long getSplitPointsConsumed() {
+      return rangeTracker.getSplitPointsConsumed();
     }
 
+    @Override
+    @Nullable
+    public final synchronized HBaseSource splitAtFraction(double fraction) {
+      ByteKey splitKey;
+      try {
+        splitKey = rangeTracker.getRange().interpolateKey(fraction);
+      } catch (RuntimeException e) {
+        LOG.info(
+            "{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), fraction, e);
+        return null;
+      }
+      LOG.info("Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+      HBaseSource primary;
+      HBaseSource residual;
+      try {
+        primary = source.withEndKey(splitKey);
+        residual = source.withStartKey(splitKey);
+      } catch (Exception e) {
+        LOG.info(
+            "{}: Interpolating for fraction {} yielded invalid split key {}.",
+            rangeTracker.getRange(),
+            fraction,
+            splitKey,
+            e);
+        return null;
+      }
+      if (!rangeTracker.trySplitAtPosition(splitKey)) {
+        return null;
+      }
+      this.source = primary;
+      return residual;
+    }
+  }
+
+  /**
+   * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be
+   * initialized with a {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies the
+   * destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId} that specifies
+   * which table to write.
+   */
+  public static Write write() {
+    return new Write(null /* SerializableConfiguration */, "");
+  }
+
+  /**
+   * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for
+   * more information.
+   *
+   * @see HBaseIO
+   */
+  public static class Write extends PTransform<PCollection<Mutation>, PDone> {
     /**
-     * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be
-     * initialized with a
-     * {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies
-     * the destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId}
-     * that specifies which table to write.
+     * Returns a new {@link HBaseIO.Write} that will write to the HBase instance indicated by the
+     * given Configuration, and using any other specified customizations.
+     *
+     * <p>Does not modify this object.
      */
-    public static Write write() {
-        return new Write(null /* SerializableConfiguration */, "");
+    public Write withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "conf");
+      return new Write(new SerializableConfiguration(configuration), tableId);
     }
 
     /**
-     * A {@link PTransform} that writes to HBase. See the class-level Javadoc on
-     * {@link HBaseIO} for more information.
+     * Returns a new {@link HBaseIO.Write} that will write to the specified table.
      *
-     * @see HBaseIO
+     * <p>Does not modify this object.
      */
-    public static class Write extends PTransform<PCollection<Mutation>, PDone> {
-        /**
-         * Returns a new {@link HBaseIO.Write} that will write to the HBase instance
-         * indicated by the given Configuration, and using any other specified customizations.
-         *
-         * <p>Does not modify this object.
-         */
-        public Write withConfiguration(Configuration configuration) {
-            checkNotNull(configuration, "conf");
-            return new Write(new SerializableConfiguration(configuration), tableId);
-        }
-
-        /**
-         * Returns a new {@link HBaseIO.Write} that will write to the specified table.
-         *
-         * <p>Does not modify this object.
-         */
-        public Write withTableId(String tableId) {
-            checkNotNull(tableId, "tableId");
-            return new Write(serializableConfiguration, tableId);
-        }
-
-        private Write(SerializableConfiguration serializableConfiguration, String tableId) {
-            this.serializableConfiguration = serializableConfiguration;
-            this.tableId = tableId;
-        }
-
-        @Override
-        public PDone expand(PCollection<Mutation> input) {
-            input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
-            return PDone.in(input.getPipeline());
-        }
-
-        @Override
-        public void validate(PipelineOptions options) {
-            checkArgument(serializableConfiguration != null, "Configuration not specified");
-            checkArgument(!tableId.isEmpty(), "Table ID not specified");
-            try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.get())) {
-                Admin admin = connection.getAdmin();
-                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
-                        "Table %s does not exist", tableId);
-            } catch (IOException e) {
-                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-            }
-        }
-
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-            super.populateDisplayData(builder);
-            builder.add(DisplayData.item("configuration",
-                    serializableConfiguration.get().toString()));
-            builder.add(DisplayData.item("tableId", tableId));
-        }
-
-        public String getTableId() {
-            return tableId;
-        }
-
-        public Configuration getConfiguration() {
-            return serializableConfiguration.get();
-        }
-
-        private final String tableId;
-        private final SerializableConfiguration serializableConfiguration;
-
-        private class HBaseWriterFn extends DoFn<Mutation, Void> {
-
-            public HBaseWriterFn(String tableId,
-                                 SerializableConfiguration serializableConfiguration) {
-                this.tableId = checkNotNull(tableId, "tableId");
-                this.serializableConfiguration = checkNotNull(serializableConfiguration,
-                        "serializableConfiguration");
-            }
-
-            @Setup
-            public void setup() throws Exception {
-                connection = ConnectionFactory.createConnection(serializableConfiguration.get());
-            }
-
-            @StartBundle
-            public void startBundle(StartBundleContext c) throws IOException {
-                BufferedMutatorParams params =
-                    new BufferedMutatorParams(TableName.valueOf(tableId));
-                mutator = connection.getBufferedMutator(params);
-                recordsWritten = 0;
-            }
+    public Write withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Write(serializableConfiguration, tableId);
+    }
 
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-                mutator.mutate(c.element());
-                ++recordsWritten;
-            }
+    private Write(SerializableConfiguration serializableConfiguration, String tableId) {
+      this.serializableConfiguration = serializableConfiguration;
+      this.tableId = tableId;
+    }
 
-            @FinishBundle
-            public void finishBundle() throws Exception {
-                mutator.flush();
-                LOG.debug("Wrote {} records", recordsWritten);
-            }
+    @Override
+    public PDone expand(PCollection<Mutation> input) {
+      input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
+      return PDone.in(input.getPipeline());
+    }
 
-            @Teardown
-            public void tearDown() throws Exception {
-                if (mutator != null) {
-                    mutator.close();
-                    mutator = null;
-                }
-                if (connection != null) {
-                    connection.close();
-                    connection = null;
-                }
-            }
+    @Override
+    public void validate(PipelineOptions options) {
+      checkArgument(serializableConfiguration != null, "Configuration not specified");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try (Connection connection =
+          ConnectionFactory.createConnection(serializableConfiguration.get())) {
+        Admin admin = connection.getAdmin();
+        checkArgument(
+            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
 
-            @Override
-            public void populateDisplayData(DisplayData.Builder builder) {
-                builder.delegate(Write.this);
-            }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
 
-            private final String tableId;
-            private final SerializableConfiguration serializableConfiguration;
+    public String getTableId() {
+      return tableId;
+    }
 
-            private Connection connection;
-            private BufferedMutator mutator;
+    public Configuration getConfiguration() {
+      return serializableConfiguration.get();
+    }
 
-            private long recordsWritten;
-        }
+    private final String tableId;
+    private final SerializableConfiguration serializableConfiguration;
+
+    private class HBaseWriterFn extends DoFn<Mutation, Void> {
+
+      public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) {
+        this.tableId = checkNotNull(tableId, "tableId");
+        this.serializableConfiguration =
+            checkNotNull(serializableConfiguration, "serializableConfiguration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = ConnectionFactory.createConnection(serializableConfiguration.get());
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableId));
+        mutator = connection.getBufferedMutator(params);
+        recordsWritten = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        mutator.mutate(c.element());
+        ++recordsWritten;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        mutator.flush();
+        LOG.debug("Wrote {} records", recordsWritten);
+      }
+
+      @Teardown
+      public void tearDown() throws Exception {
+        if (mutator != null) {
+          mutator.close();
+          mutator = null;
+        }
+        if (connection != null) {
+          connection.close();
+          connection = null;
+        }
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.delegate(Write.this);
+      }
+
+      private final String tableId;
+      private final SerializableConfiguration serializableConfiguration;
+
+      private Connection connection;
+      private BufferedMutator mutator;
+
+      private long recordsWritten;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index ee83114..e7a36d5 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -71,30 +71,29 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
   }
 
   /**
-   * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for
-   * {@link Mutation mutations}.
+   * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for {@link Mutation
+   * mutations}.
    */
   static CoderProvider getCoderProvider() {
     return HBASE_MUTATION_CODER_PROVIDER;
   }
 
   private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER =
-    new HBaseMutationCoderProvider();
+      new HBaseMutationCoderProvider();
 
-  /**
-   * A {@link CoderProvider} for {@link Mutation mutations}.
-   */
+  /** A {@link CoderProvider} for {@link Mutation mutations}. */
   private static class HBaseMutationCoderProvider extends CoderProvider {
     @Override
-    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
-        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+    public <T> Coder<T> coderFor(
+        TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+        throws CannotProvideCoderException {
       if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) {
         throw new CannotProvideCoderException(
-          String.format(
-            "Cannot provide %s because %s is not a subclass of %s",
-            HBaseMutationCoder.class.getSimpleName(),
-            typeDescriptor,
-            Mutation.class.getName()));
+            String.format(
+                "Cannot provide %s because %s is not a subclass of %s",
+                HBaseMutationCoder.class.getSimpleName(),
+                typeDescriptor,
+                Mutation.class.getName()));
       }
 
       try {
@@ -106,5 +105,5 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
   }
 
   private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR =
-    new TypeDescriptor<Mutation>() {};
+      new TypeDescriptor<Mutation>() {};
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 1d06635..bce1567 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -41,14 +41,12 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
   }
 
   @Override
-  public void encode(Result value, OutputStream outputStream)
-          throws IOException {
+  public void encode(Result value, OutputStream outputStream) throws IOException {
     ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
   }
 
   @Override
-  public Result decode(InputStream inputStream)
-      throws IOException {
+  public Result decode(InputStream inputStream) throws IOException {
     return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
index f3bc7ac..6ed3c51 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
@@ -25,31 +25,28 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 
-/**
- * This is just a wrapper class to serialize HBase {@link Scan} using Protobuf.
- */
+/** This is just a wrapper class to serialize HBase {@link Scan} using Protobuf. */
 class SerializableScan implements Serializable {
-    private transient Scan scan;
+  private transient Scan scan;
 
-    public SerializableScan() {
-    }
+  public SerializableScan() {}
 
-    public SerializableScan(Scan scan) {
-        if (scan == null) {
-            throw new NullPointerException("Scan must not be null.");
-        }
-        this.scan = scan;
+  public SerializableScan(Scan scan) {
+    if (scan == null) {
+      throw new NullPointerException("Scan must not be null.");
     }
+    this.scan = scan;
+  }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        ProtobufUtil.toScan(scan).writeDelimitedTo(out);
-    }
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    ProtobufUtil.toScan(scan).writeDelimitedTo(out);
+  }
 
-    private void readObject(ObjectInputStream in) throws IOException {
-        scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
-    }
+  private void readObject(ObjectInputStream in) throws IOException {
+    scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
+  }
 
-    public Scan get() {
-        return scan;
-    }
+  public Scan get() {
+    return scan;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
index 5b2e138..25369fc 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -26,9 +26,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for {@link HBaseCoderProviderRegistrar}.
- */
+/** Tests for {@link HBaseCoderProviderRegistrar}. */
 @RunWith(JUnit4.class)
 public class HBaseCoderProviderRegistrarTest {
   @Test