You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/25 21:53:17 UTC
[2/3] beam git commit: Fix code style issues for HBaseIO
Fix code style issues for HBaseIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5bdedd2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5bdedd2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5bdedd2
Branch: refs/heads/master
Commit: e5bdedd23208e484f6852eda44c59fb873645e8f
Parents: cdf050c
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Fri Aug 25 10:43:17 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Aug 25 23:52:22 2017 +0200
----------------------------------------------------------------------
.../io/hbase/HBaseCoderProviderRegistrar.java | 8 +-
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 1090 +++++++++---------
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 27 +-
.../beam/sdk/io/hbase/HBaseResultCoder.java | 6 +-
.../beam/sdk/io/hbase/SerializableScan.java | 37 +-
.../hbase/HBaseCoderProviderRegistrarTest.java | 4 +-
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 814 +++++++------
.../sdk/io/hbase/HBaseMutationCoderTest.java | 4 +-
.../beam/sdk/io/hbase/HBaseResultCoderTest.java | 4 +-
.../beam/sdk/io/hbase/SerializableScanTest.java | 6 +-
10 files changed, 987 insertions(+), 1013 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
index 2973d1b..f836ebe 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -26,15 +26,13 @@ import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.hbase.client.Result;
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
- */
+/** A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. */
@AutoService(CoderProviderRegistrar.class)
public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
- HBaseMutationCoder.getCoderProvider(),
- CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+ HBaseMutationCoder.getCoderProvider(),
+ CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 7f58cef..41ced93 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -71,19 +71,19 @@ import org.slf4j.LoggerFactory;
/**
* A bounded source and sink for HBase.
*
- * <p>For more information, see the online documentation at
- * <a href="https://hbase.apache.org/">HBase</a>.
+ * <p>For more information, see the online documentation at <a
+ * href="https://hbase.apache.org/">HBase</a>.
*
* <h3>Reading from HBase</h3>
*
- * <p>The HBase source returns a set of rows from a single table, returning a
- * {@code PCollection<Result>}.
+ * <p>The HBase source returns a set of rows from a single table, returning a {@code
+ * PCollection<Result>}.
*
- * <p>To configure a HBase source, you must supply a table id and a {@link Configuration}
- * to identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the
- * table. The row range to be read can optionally be restricted using with a {@link Scan} object
- * or using the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using
- * {@link HBaseIO.Read#withFilter}, for example:
+ * <p>To configure a HBase source, you must supply a table id and a {@link Configuration} to
+ * identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the table.
+ * The row range to be read can optionally be restricted using with a {@link Scan} object or using
+ * the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using {@link
+ * HBaseIO.Read#withFilter}, for example:
*
* <pre>{@code
* // Scan the entire table.
@@ -118,12 +118,12 @@ import org.slf4j.LoggerFactory;
*
* <h3>Writing to HBase</h3>
*
- * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<Mutation>}, where each {@link Mutation} represents an
- * idempotent transformation on a row.
+ * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a {@link
+ * PCollection PCollection<Mutation>}, where each {@link Mutation} represents an idempotent
+ * transformation on a row.
*
- * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
- * to identify the HBase instance, for example:
+ * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} to identify
+ * the HBase instance, for example:
*
* <pre>{@code
* Configuration configuration = ...;
@@ -137,605 +137,605 @@ import org.slf4j.LoggerFactory;
*
* <h3>Experimental</h3>
*
- * <p>The design of the API for HBaseIO is currently related to the BigtableIO one,
- * it can evolve or be different in some aspects, but the idea is that users can easily migrate
- * from one to the other</p>.
+ * <p>The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or
+ * be different in some aspects, but the idea is that users can easily migrate from one to the other
+ * .
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class HBaseIO {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
-
- /** Disallow construction of utility class. */
- private HBaseIO() {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
+
+ /** Disallow construction of utility class. */
+ private HBaseIO() {}
+
+ /**
+ * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be initialized
+ * with a {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies the HBase instance,
+ * and a {@link HBaseIO.Read#withTableId tableId} that specifies which table to read. A {@link
+ * Filter} may also optionally be specified using {@link HBaseIO.Read#withFilter}.
+ */
+ @Experimental
+ public static Read read() {
+ return new Read(null, "", new SerializableScan(new Scan()));
+ }
+
+ /**
+ * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for
+ * more information.
+ *
+ * @see HBaseIO
+ */
+ public static class Read extends PTransform<PBegin, PCollection<Result>> {
+ /**
+ * Returns a new {@link HBaseIO.Read} that will read from the HBase instance indicated by the
+ * given configuration.
+ */
+ public Read withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "conf");
+ return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
}
/**
- * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be
- * initialized with a
- * {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies
- * the HBase instance, and a {@link HBaseIO.Read#withTableId tableId} that
- * specifies which table to read. A {@link Filter} may also optionally be specified using
- * {@link HBaseIO.Read#withFilter}.
+ * Returns a new {@link HBaseIO.Read} that will read from the specified table.
+ *
+ * <p>Does not modify this object.
*/
- @Experimental
- public static Read read() {
- return new Read(null, "", new SerializableScan(new Scan()));
+ public Read withTableId(String tableId) {
+ checkNotNull(tableId, "tableId");
+ return new Read(serializableConfiguration, tableId, serializableScan);
}
/**
- * A {@link PTransform} that reads from HBase. See the class-level Javadoc on
- * {@link HBaseIO} for more information.
+ * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
+ * scan.
*
- * @see HBaseIO
+ * <p>Does not modify this object.
*/
- public static class Read extends PTransform<PBegin, PCollection<Result>> {
- /**
- * Returns a new {@link HBaseIO.Read} that will read from the HBase instance
- * indicated by the given configuration.
- */
- public Read withConfiguration(Configuration configuration) {
- checkNotNull(configuration, "conf");
- return new Read(new SerializableConfiguration(configuration),
- tableId, serializableScan);
- }
+ public Read withScan(Scan scan) {
+ checkNotNull(scan, "scan");
+ return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
+ }
- /**
- * Returns a new {@link HBaseIO.Read} that will read from the specified table.
- *
- * <p>Does not modify this object.
- */
- public Read withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Read(serializableConfiguration, tableId, serializableScan);
- }
+ /**
+ * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
+ * row filter.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withFilter(Filter filter) {
+ checkNotNull(filter, "filter");
+ return withScan(serializableScan.get().setFilter(filter));
+ }
- /**
- * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase
- * using the given scan.
- *
- * <p>Does not modify this object.
- */
- public Read withScan(Scan scan) {
- checkNotNull(scan, "scan");
- return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
- }
+ /**
+ * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withKeyRange(ByteKeyRange keyRange) {
+ checkNotNull(keyRange, "keyRange");
+ byte[] startRow = keyRange.getStartKey().getBytes();
+ byte[] stopRow = keyRange.getEndKey().getBytes();
+ return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
+ }
- /**
- * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase
- * using the given row filter.
- *
- * <p>Does not modify this object.
- */
- public Read withFilter(Filter filter) {
- checkNotNull(filter, "filter");
- return withScan(serializableScan.get().setFilter(filter));
- }
+ /**
+ * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+ checkNotNull(startRow, "startRow");
+ checkNotNull(stopRow, "stopRow");
+ ByteKeyRange keyRange =
+ ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+ return withKeyRange(keyRange);
+ }
- /**
- * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
- *
- * <p>Does not modify this object.
- */
- public Read withKeyRange(ByteKeyRange keyRange) {
- checkNotNull(keyRange, "keyRange");
- byte[] startRow = keyRange.getStartKey().getBytes();
- byte[] stopRow = keyRange.getEndKey().getBytes();
- return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
- }
+ private Read(
+ SerializableConfiguration serializableConfiguration,
+ String tableId,
+ SerializableScan serializableScan) {
+ this.serializableConfiguration = serializableConfiguration;
+ this.tableId = tableId;
+ this.serializableScan = serializableScan;
+ }
- /**
- * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
- *
- * <p>Does not modify this object.
- */
- public Read withKeyRange(byte[] startRow, byte[] stopRow) {
- checkNotNull(startRow, "startRow");
- checkNotNull(stopRow, "stopRow");
- ByteKeyRange keyRange =
- ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
- return withKeyRange(keyRange);
- }
+ @Override
+ public PCollection<Result> expand(PBegin input) {
+ HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
+ return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+ }
- private Read(SerializableConfiguration serializableConfiguration, String tableId,
- SerializableScan serializableScan) {
- this.serializableConfiguration = serializableConfiguration;
- this.tableId = tableId;
- this.serializableScan = serializableScan;
- }
+ @Override
+ public void validate(PipelineOptions options) {
+ checkArgument(serializableConfiguration != null, "Configuration not provided");
+ checkArgument(!tableId.isEmpty(), "Table ID not specified");
+ try (Connection connection =
+ ConnectionFactory.createConnection(serializableConfiguration.get())) {
+ Admin admin = connection.getAdmin();
+ checkArgument(
+ admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ }
- @Override
- public PCollection<Result> expand(PBegin input) {
- HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
- return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString()));
+ }
- @Override
- public void validate(PipelineOptions options) {
- checkArgument(serializableConfiguration != null,
- "Configuration not provided");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try (Connection connection = ConnectionFactory.createConnection(
- serializableConfiguration.get())) {
- Admin admin = connection.getAdmin();
- checkArgument(admin.tableExists(TableName.valueOf(tableId)),
- "Table %s does not exist", tableId);
- } catch (IOException e) {
- LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
+ public String getTableId() {
+ return tableId;
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("configuration",
- serializableConfiguration.get().toString()));
- builder.add(DisplayData.item("tableId", tableId));
- builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString()));
- }
+ public Configuration getConfiguration() {
+ return serializableConfiguration.get();
+ }
- public String getTableId() {
- return tableId;
- }
+ /** Returns the range of keys that will be read from the table. */
+ public ByteKeyRange getKeyRange() {
+ byte[] startRow = serializableScan.get().getStartRow();
+ byte[] stopRow = serializableScan.get().getStopRow();
+ return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+ }
- public Configuration getConfiguration() {
- return serializableConfiguration.get();
- }
+ private final SerializableConfiguration serializableConfiguration;
+ private final String tableId;
+ private final SerializableScan serializableScan;
+ }
- /**
- * Returns the range of keys that will be read from the table.
- */
- public ByteKeyRange getKeyRange() {
- byte[] startRow = serializableScan.get().getStartRow();
- byte[] stopRow = serializableScan.get().getStopRow();
- return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
- }
+ static class HBaseSource extends BoundedSource<Result> {
+ private final Read read;
+ @Nullable private Long estimatedSizeBytes;
- private final SerializableConfiguration serializableConfiguration;
- private final String tableId;
- private final SerializableScan serializableScan;
+ HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
+ this.read = read;
+ this.estimatedSizeBytes = estimatedSizeBytes;
}
- static class HBaseSource extends BoundedSource<Result> {
- private final Read read;
- @Nullable private Long estimatedSizeBytes;
-
- HBaseSource(Read read, @Nullable Long estimatedSizeBytes) {
- this.read = read;
- this.estimatedSizeBytes = estimatedSizeBytes;
- }
+ HBaseSource withStartKey(ByteKey startKey) throws IOException {
+ checkNotNull(startKey, "startKey");
+ Read newRead =
+ new Read(
+ read.serializableConfiguration,
+ read.tableId,
+ new SerializableScan(
+ new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
+ return new HBaseSource(newRead, estimatedSizeBytes);
+ }
- HBaseSource withStartKey(ByteKey startKey) throws IOException {
- checkNotNull(startKey, "startKey");
- Read newRead = new Read(read.serializableConfiguration, read.tableId,
- new SerializableScan(
- new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes())));
- return new HBaseSource(newRead, estimatedSizeBytes);
- }
+ HBaseSource withEndKey(ByteKey endKey) throws IOException {
+ checkNotNull(endKey, "endKey");
+ Read newRead =
+ new Read(
+ read.serializableConfiguration,
+ read.tableId,
+ new SerializableScan(
+ new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
+ return new HBaseSource(newRead, estimatedSizeBytes);
+ }
- HBaseSource withEndKey(ByteKey endKey) throws IOException {
- checkNotNull(endKey, "endKey");
- Read newRead = new Read(read.serializableConfiguration, read.tableId,
- new SerializableScan(
- new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes())));
- return new HBaseSource(newRead, estimatedSizeBytes);
- }
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
+ if (estimatedSizeBytes == null) {
+ estimatedSizeBytes = estimateSizeBytes();
+ LOG.debug(
+ "Estimated size {} bytes for table {} and scan {}",
+ estimatedSizeBytes,
+ read.tableId,
+ read.serializableScan.get());
+ }
+ return estimatedSizeBytes;
+ }
- @Override
- public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
- if (estimatedSizeBytes == null) {
- estimatedSizeBytes = estimateSizeBytes();
- LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes,
- read.tableId, read.serializableScan.get());
+ /**
+ * This estimates the real size, it can be the compressed size depending on the HBase
+ * configuration.
+ */
+ private long estimateSizeBytes() throws Exception {
+ // This code is based on RegionSizeCalculator in hbase-server
+ long estimatedSizeBytes = 0L;
+ Configuration configuration = this.read.serializableConfiguration.get();
+ try (Connection connection = ConnectionFactory.createConnection(configuration)) {
+ // filter regions for the given table/scan
+ List<HRegionLocation> regionLocations = getRegionLocations(connection);
+
+ // builds set of regions who are part of the table scan
+ Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation regionLocation : regionLocations) {
+ tableRegions.add(regionLocation.getRegionInfo().getRegionName());
+ }
+
+ // calculate estimated size for the regions
+ Admin admin = connection.getAdmin();
+ ClusterStatus clusterStatus = admin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ for (ServerName serverName : servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+ for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+ if (tableRegions.contains(regionId)) {
+ long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L;
+ estimatedSizeBytes += regionSizeBytes;
}
- return estimatedSizeBytes;
+ }
}
+ }
+ return estimatedSizeBytes;
+ }
- /**
- * This estimates the real size, it can be the compressed size depending on the HBase
- * configuration.
- */
- private long estimateSizeBytes() throws Exception {
- // This code is based on RegionSizeCalculator in hbase-server
- long estimatedSizeBytes = 0L;
- Configuration configuration = this.read.serializableConfiguration.get();
- try (Connection connection = ConnectionFactory.createConnection(configuration)) {
- // filter regions for the given table/scan
- List<HRegionLocation> regionLocations = getRegionLocations(connection);
-
- // builds set of regions who are part of the table scan
- Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- for (HRegionLocation regionLocation : regionLocations) {
- tableRegions.add(regionLocation.getRegionInfo().getRegionName());
- }
-
- // calculate estimated size for the regions
- Admin admin = connection.getAdmin();
- ClusterStatus clusterStatus = admin.getClusterStatus();
- Collection<ServerName> servers = clusterStatus.getServers();
- for (ServerName serverName : servers) {
- ServerLoad serverLoad = clusterStatus.getLoad(serverName);
- for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
- byte[] regionId = regionLoad.getName();
- if (tableRegions.contains(regionId)) {
- long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L;
- estimatedSizeBytes += regionSizeBytes;
- }
- }
- }
- }
- return estimatedSizeBytes;
- }
+ private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception {
+ final Scan scan = read.serializableScan.get();
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
- private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception {
- final Scan scan = read.serializableScan.get();
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
-
- final List<HRegionLocation> regionLocations = new ArrayList<>();
-
- final boolean scanWithNoLowerBound = startRow.length == 0;
- final boolean scanWithNoUpperBound = stopRow.length == 0;
-
- TableName tableName = TableName.valueOf(read.tableId);
- RegionLocator regionLocator = connection.getRegionLocator(tableName);
- List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
- for (HRegionLocation regionLocation : tableRegionInfos) {
- final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
- final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
- boolean isLastRegion = endKey.length == 0;
- // filters regions who are part of the scan
- if ((scanWithNoLowerBound
- || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
- && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
- regionLocations.add(regionLocation);
- }
- }
+ final List<HRegionLocation> regionLocations = new ArrayList<>();
- return regionLocations;
- }
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
- private List<HBaseSource>
- splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits)
- throws Exception {
- final Scan scan = read.serializableScan.get();
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
-
- final List<HBaseSource> sources = new ArrayList<>(numSplits);
- final boolean scanWithNoLowerBound = startRow.length == 0;
- final boolean scanWithNoUpperBound = stopRow.length == 0;
-
- for (HRegionLocation regionLocation : regionLocations) {
- final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
- final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
- boolean isLastRegion = endKey.length == 0;
- String host = regionLocation.getHostnamePort();
-
- final byte[] splitStart = (scanWithNoLowerBound
- || Bytes.compareTo(startKey, startRow) >= 0) ? startKey : startRow;
- final byte[] splitStop =
- (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
- && !isLastRegion ? endKey : stopRow;
-
- LOG.debug("{} {} {} {} {}", sources.size(), host, read.tableId,
- Bytes.toString(splitStart), Bytes.toString(splitStop));
-
- // We need to create a new copy of the scan and read to add the new ranges
- Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
- Read newRead = new Read(read.serializableConfiguration, read.tableId,
- new SerializableScan(newScan));
- sources.add(new HBaseSource(newRead, estimatedSizeBytes));
- }
- return sources;
+ TableName tableName = TableName.valueOf(read.tableId);
+ RegionLocator regionLocator = connection.getRegionLocator(tableName);
+ List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
+ for (HRegionLocation regionLocation : tableRegionInfos) {
+ final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+ final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+ boolean isLastRegion = endKey.length == 0;
+ // filters regions who are part of the scan
+ if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
+ && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+ regionLocations.add(regionLocation);
}
+ }
+
+ return regionLocations;
+ }
+
+ private List<HBaseSource> splitBasedOnRegions(
+ List<HRegionLocation> regionLocations, int numSplits) throws Exception {
+ final Scan scan = read.serializableScan.get();
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ final List<HBaseSource> sources = new ArrayList<>(numSplits);
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+ for (HRegionLocation regionLocation : regionLocations) {
+ final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+ final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+ boolean isLastRegion = endKey.length == 0;
+ String host = regionLocation.getHostnamePort();
+
+ final byte[] splitStart =
+ (scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0)
+ ? startKey
+ : startRow;
+ final byte[] splitStop =
+ (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion
+ ? endKey
+ : stopRow;
+
+ LOG.debug(
+ "{} {} {} {} {}",
+ sources.size(),
+ host,
+ read.tableId,
+ Bytes.toString(splitStart),
+ Bytes.toString(splitStop));
+
+ // We need to create a new copy of the scan and read to add the new ranges
+ Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
+ Read newRead =
+ new Read(read.serializableConfiguration, read.tableId, new SerializableScan(newScan));
+ sources.add(new HBaseSource(newRead, estimatedSizeBytes));
+ }
+ return sources;
+ }
@Override
public List<? extends BoundedSource<Result>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
- long estimatedSizeBytes = getEstimatedSizeBytes(options);
- int numSplits = 1;
- if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) {
- numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
- }
-
- try (Connection connection = ConnectionFactory.createConnection(
- read.getConfiguration())) {
- List<HRegionLocation> regionLocations = getRegionLocations(connection);
- int realNumSplits =
- numSplits < regionLocations.size() ? regionLocations.size() : numSplits;
- LOG.debug("Suggested {} bundle(s) based on size", numSplits);
- LOG.debug("Suggested {} bundle(s) based on number of regions",
- regionLocations.size());
- final List<HBaseSource> sources = splitBasedOnRegions(regionLocations,
- realNumSplits);
- LOG.debug("Split into {} bundle(s)", sources.size());
- if (numSplits >= 1) {
- return sources;
- }
- return Collections.singletonList(this);
- }
- }
-
- @Override
- public BoundedReader<Result> createReader(PipelineOptions pipelineOptions)
- throws IOException {
- return new HBaseReader(this);
- }
-
- @Override
- public void validate() {
- read.validate(null /* input */);
- }
+ LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
+ long estimatedSizeBytes = getEstimatedSizeBytes(options);
+ int numSplits = 1;
+ if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) {
+ numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
+ }
+
+ try (Connection connection = ConnectionFactory.createConnection(read.getConfiguration())) {
+ List<HRegionLocation> regionLocations = getRegionLocations(connection);
+ int realNumSplits = numSplits < regionLocations.size() ? regionLocations.size() : numSplits;
+ LOG.debug("Suggested {} bundle(s) based on size", numSplits);
+ LOG.debug("Suggested {} bundle(s) based on number of regions", regionLocations.size());
+ final List<HBaseSource> sources = splitBasedOnRegions(regionLocations, realNumSplits);
+ LOG.debug("Split into {} bundle(s)", sources.size());
+ if (numSplits >= 1) {
+ return sources;
+ }
+ return Collections.singletonList(this);
+ }
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- read.populateDisplayData(builder);
- }
+ @Override
+ public BoundedReader<Result> createReader(PipelineOptions pipelineOptions) throws IOException {
+ return new HBaseReader(this);
+ }
- @Override
- public Coder<Result> getOutputCoder() {
- return HBaseResultCoder.of();
- }
+ @Override
+ public void validate() {
+ read.validate(null /* input */);
}
- private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
- private HBaseSource source;
- private Connection connection;
- private ResultScanner scanner;
- private Iterator<Result> iter;
- private Result current;
- private final ByteKeyRangeTracker rangeTracker;
- private long recordsReturned;
-
- HBaseReader(HBaseSource source) {
- this.source = source;
- Scan scan = source.read.serializableScan.get();
- ByteKeyRange range = ByteKeyRange
- .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
- rangeTracker = ByteKeyRangeTracker.of(range);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ read.populateDisplayData(builder);
+ }
- @Override
- public boolean start() throws IOException {
- HBaseSource source = getCurrentSource();
- Configuration configuration = source.read.serializableConfiguration.get();
- String tableId = source.read.tableId;
- connection = ConnectionFactory.createConnection(configuration);
- TableName tableName = TableName.valueOf(tableId);
- Table table = connection.getTable(tableName);
- // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it.
- Scan scanClone = new Scan(source.read.serializableScan.get());
- scanner = table.getScanner(scanClone);
- iter = scanner.iterator();
- return advance();
- }
+ @Override
+ public Coder<Result> getOutputCoder() {
+ return HBaseResultCoder.of();
+ }
+ }
+
+ private static class HBaseReader extends BoundedSource.BoundedReader<Result> {
+ private HBaseSource source;
+ private Connection connection;
+ private ResultScanner scanner;
+ private Iterator<Result> iter;
+ private Result current;
+ private final ByteKeyRangeTracker rangeTracker;
+ private long recordsReturned;
+
+ HBaseReader(HBaseSource source) {
+ this.source = source;
+ Scan scan = source.read.serializableScan.get();
+ ByteKeyRange range =
+ ByteKeyRange.of(
+ ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow()));
+ rangeTracker = ByteKeyRangeTracker.of(range);
+ }
- @Override
- public Result getCurrent() throws NoSuchElementException {
- return current;
- }
+ @Override
+ public boolean start() throws IOException {
+ HBaseSource source = getCurrentSource();
+ Configuration configuration = source.read.serializableConfiguration.get();
+ String tableId = source.read.tableId;
+ connection = ConnectionFactory.createConnection(configuration);
+ TableName tableName = TableName.valueOf(tableId);
+ Table table = connection.getTable(tableName);
+ // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it.
+ Scan scanClone = new Scan(source.read.serializableScan.get());
+ scanner = table.getScanner(scanClone);
+ iter = scanner.iterator();
+ return advance();
+ }
- @Override
- public boolean advance() throws IOException {
- if (!iter.hasNext()) {
- return rangeTracker.markDone();
- }
- final Result next = iter.next();
- boolean hasRecord =
- rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
- || rangeTracker.markDone();
- if (hasRecord) {
- current = next;
- ++recordsReturned;
- }
- return hasRecord;
- }
+ @Override
+ public Result getCurrent() throws NoSuchElementException {
+ return current;
+ }
- @Override
- public void close() throws IOException {
- LOG.debug("Closing reader after reading {} records.", recordsReturned);
- if (scanner != null) {
- scanner.close();
- scanner = null;
- }
- if (connection != null) {
- connection.close();
- connection = null;
- }
- }
+ @Override
+ public boolean advance() throws IOException {
+ if (!iter.hasNext()) {
+ return rangeTracker.markDone();
+ }
+ final Result next = iter.next();
+ boolean hasRecord =
+ rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow()))
+ || rangeTracker.markDone();
+ if (hasRecord) {
+ current = next;
+ ++recordsReturned;
+ }
+ return hasRecord;
+ }
- @Override
- public synchronized HBaseSource getCurrentSource() {
- return source;
- }
+ @Override
+ public void close() throws IOException {
+ LOG.debug("Closing reader after reading {} records.", recordsReturned);
+ if (scanner != null) {
+ scanner.close();
+ scanner = null;
+ }
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ }
- @Override
- public final Double getFractionConsumed() {
- return rangeTracker.getFractionConsumed();
- }
+ @Override
+ public synchronized HBaseSource getCurrentSource() {
+ return source;
+ }
- @Override
- public final long getSplitPointsConsumed() {
- return rangeTracker.getSplitPointsConsumed();
- }
+ @Override
+ public final Double getFractionConsumed() {
+ return rangeTracker.getFractionConsumed();
+ }
- @Override
- @Nullable
- public final synchronized HBaseSource splitAtFraction(double fraction) {
- ByteKey splitKey;
- try {
- splitKey = rangeTracker.getRange().interpolateKey(fraction);
- } catch (RuntimeException e) {
- LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(),
- fraction, e);
- return null;
- }
- LOG.info(
- "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
- HBaseSource primary;
- HBaseSource residual;
- try {
- primary = source.withEndKey(splitKey);
- residual = source.withStartKey(splitKey);
- } catch (Exception e) {
- LOG.info(
- "{}: Interpolating for fraction {} yielded invalid split key {}.",
- rangeTracker.getRange(),
- fraction,
- splitKey,
- e);
- return null;
- }
- if (!rangeTracker.trySplitAtPosition(splitKey)) {
- return null;
- }
- this.source = primary;
- return residual;
- }
+ @Override
+ public final long getSplitPointsConsumed() {
+ return rangeTracker.getSplitPointsConsumed();
}
+ @Override
+ @Nullable
+ public final synchronized HBaseSource splitAtFraction(double fraction) {
+ ByteKey splitKey;
+ try {
+ splitKey = rangeTracker.getRange().interpolateKey(fraction);
+ } catch (RuntimeException e) {
+ LOG.info(
+ "{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), fraction, e);
+ return null;
+ }
+ LOG.info("Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+ HBaseSource primary;
+ HBaseSource residual;
+ try {
+ primary = source.withEndKey(splitKey);
+ residual = source.withStartKey(splitKey);
+ } catch (Exception e) {
+ LOG.info(
+ "{}: Interpolating for fraction {} yielded invalid split key {}.",
+ rangeTracker.getRange(),
+ fraction,
+ splitKey,
+ e);
+ return null;
+ }
+ if (!rangeTracker.trySplitAtPosition(splitKey)) {
+ return null;
+ }
+ this.source = primary;
+ return residual;
+ }
+ }
+
+ /**
+ * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be
+ * initialized with a {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies the
+ * destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId} that specifies
+ * which table to write.
+ */
+ public static Write write() {
+ return new Write(null /* SerializableConfiguration */, "");
+ }
+
+ /**
+ * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for
+ * more information.
+ *
+ * @see HBaseIO
+ */
+ public static class Write extends PTransform<PCollection<Mutation>, PDone> {
/**
- * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be
- * initialized with a
- * {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies
- * the destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId}
- * that specifies which table to write.
+ * Returns a new {@link HBaseIO.Write} that will write to the HBase instance indicated by the
+ * given Configuration, and using any other specified customizations.
+ *
+ * <p>Does not modify this object.
*/
- public static Write write() {
- return new Write(null /* SerializableConfiguration */, "");
+ public Write withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "conf");
+ return new Write(new SerializableConfiguration(configuration), tableId);
}
/**
- * A {@link PTransform} that writes to HBase. See the class-level Javadoc on
- * {@link HBaseIO} for more information.
+ * Returns a new {@link HBaseIO.Write} that will write to the specified table.
*
- * @see HBaseIO
+ * <p>Does not modify this object.
*/
- public static class Write extends PTransform<PCollection<Mutation>, PDone> {
- /**
- * Returns a new {@link HBaseIO.Write} that will write to the HBase instance
- * indicated by the given Configuration, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Write withConfiguration(Configuration configuration) {
- checkNotNull(configuration, "conf");
- return new Write(new SerializableConfiguration(configuration), tableId);
- }
-
- /**
- * Returns a new {@link HBaseIO.Write} that will write to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Write(serializableConfiguration, tableId);
- }
-
- private Write(SerializableConfiguration serializableConfiguration, String tableId) {
- this.serializableConfiguration = serializableConfiguration;
- this.tableId = tableId;
- }
-
- @Override
- public PDone expand(PCollection<Mutation> input) {
- input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
- return PDone.in(input.getPipeline());
- }
-
- @Override
- public void validate(PipelineOptions options) {
- checkArgument(serializableConfiguration != null, "Configuration not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try (Connection connection = ConnectionFactory.createConnection(
- serializableConfiguration.get())) {
- Admin admin = connection.getAdmin();
- checkArgument(admin.tableExists(TableName.valueOf(tableId)),
- "Table %s does not exist", tableId);
- } catch (IOException e) {
- LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("configuration",
- serializableConfiguration.get().toString()));
- builder.add(DisplayData.item("tableId", tableId));
- }
-
- public String getTableId() {
- return tableId;
- }
-
- public Configuration getConfiguration() {
- return serializableConfiguration.get();
- }
-
- private final String tableId;
- private final SerializableConfiguration serializableConfiguration;
-
- private class HBaseWriterFn extends DoFn<Mutation, Void> {
-
- public HBaseWriterFn(String tableId,
- SerializableConfiguration serializableConfiguration) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.serializableConfiguration = checkNotNull(serializableConfiguration,
- "serializableConfiguration");
- }
-
- @Setup
- public void setup() throws Exception {
- connection = ConnectionFactory.createConnection(serializableConfiguration.get());
- }
-
- @StartBundle
- public void startBundle(StartBundleContext c) throws IOException {
- BufferedMutatorParams params =
- new BufferedMutatorParams(TableName.valueOf(tableId));
- mutator = connection.getBufferedMutator(params);
- recordsWritten = 0;
- }
+ public Write withTableId(String tableId) {
+ checkNotNull(tableId, "tableId");
+ return new Write(serializableConfiguration, tableId);
+ }
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- mutator.mutate(c.element());
- ++recordsWritten;
- }
+ private Write(SerializableConfiguration serializableConfiguration, String tableId) {
+ this.serializableConfiguration = serializableConfiguration;
+ this.tableId = tableId;
+ }
- @FinishBundle
- public void finishBundle() throws Exception {
- mutator.flush();
- LOG.debug("Wrote {} records", recordsWritten);
- }
+ @Override
+ public PDone expand(PCollection<Mutation> input) {
+ input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
+ return PDone.in(input.getPipeline());
+ }
- @Teardown
- public void tearDown() throws Exception {
- if (mutator != null) {
- mutator.close();
- mutator = null;
- }
- if (connection != null) {
- connection.close();
- connection = null;
- }
- }
+ @Override
+ public void validate(PipelineOptions options) {
+ checkArgument(serializableConfiguration != null, "Configuration not specified");
+ checkArgument(!tableId.isEmpty(), "Table ID not specified");
+ try (Connection connection =
+ ConnectionFactory.createConnection(serializableConfiguration.get())) {
+ Admin admin = connection.getAdmin();
+ checkArgument(
+ admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.this);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
- private final String tableId;
- private final SerializableConfiguration serializableConfiguration;
+ public String getTableId() {
+ return tableId;
+ }
- private Connection connection;
- private BufferedMutator mutator;
+ public Configuration getConfiguration() {
+ return serializableConfiguration.get();
+ }
- private long recordsWritten;
- }
+ private final String tableId;
+ private final SerializableConfiguration serializableConfiguration;
+
+ private class HBaseWriterFn extends DoFn<Mutation, Void> {
+
+ public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) {
+ this.tableId = checkNotNull(tableId, "tableId");
+ this.serializableConfiguration =
+ checkNotNull(serializableConfiguration, "serializableConfiguration");
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = ConnectionFactory.createConnection(serializableConfiguration.get());
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
+ BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableId));
+ mutator = connection.getBufferedMutator(params);
+ recordsWritten = 0;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ mutator.mutate(c.element());
+ ++recordsWritten;
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ mutator.flush();
+ LOG.debug("Wrote {} records", recordsWritten);
+ }
+
+ @Teardown
+ public void tearDown() throws Exception {
+ if (mutator != null) {
+ mutator.close();
+ mutator = null;
+ }
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Write.this);
+ }
+
+ private final String tableId;
+ private final SerializableConfiguration serializableConfiguration;
+
+ private Connection connection;
+ private BufferedMutator mutator;
+
+ private long recordsWritten;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index ee83114..e7a36d5 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -71,30 +71,29 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
}
/**
- * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for
- * {@link Mutation mutations}.
+ * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for {@link Mutation
+ * mutations}.
*/
static CoderProvider getCoderProvider() {
return HBASE_MUTATION_CODER_PROVIDER;
}
private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER =
- new HBaseMutationCoderProvider();
+ new HBaseMutationCoderProvider();
- /**
- * A {@link CoderProvider} for {@link Mutation mutations}.
- */
+ /** A {@link CoderProvider} for {@link Mutation mutations}. */
private static class HBaseMutationCoderProvider extends CoderProvider {
@Override
- public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
- List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ public <T> Coder<T> coderFor(
+ TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException {
if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) {
throw new CannotProvideCoderException(
- String.format(
- "Cannot provide %s because %s is not a subclass of %s",
- HBaseMutationCoder.class.getSimpleName(),
- typeDescriptor,
- Mutation.class.getName()));
+ String.format(
+ "Cannot provide %s because %s is not a subclass of %s",
+ HBaseMutationCoder.class.getSimpleName(),
+ typeDescriptor,
+ Mutation.class.getName()));
}
try {
@@ -106,5 +105,5 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
}
private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR =
- new TypeDescriptor<Mutation>() {};
+ new TypeDescriptor<Mutation>() {};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 1d06635..bce1567 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -41,14 +41,12 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
}
@Override
- public void encode(Result value, OutputStream outputStream)
- throws IOException {
+ public void encode(Result value, OutputStream outputStream) throws IOException {
ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
}
@Override
- public Result decode(InputStream inputStream)
- throws IOException {
+ public Result decode(InputStream inputStream) throws IOException {
return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
index f3bc7ac..6ed3c51 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java
@@ -25,31 +25,28 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-/**
- * This is just a wrapper class to serialize HBase {@link Scan} using Protobuf.
- */
+/** This is just a wrapper class to serialize HBase {@link Scan} using Protobuf. */
class SerializableScan implements Serializable {
- private transient Scan scan;
+ private transient Scan scan;
- public SerializableScan() {
- }
+ public SerializableScan() {}
- public SerializableScan(Scan scan) {
- if (scan == null) {
- throw new NullPointerException("Scan must not be null.");
- }
- this.scan = scan;
+ public SerializableScan(Scan scan) {
+ if (scan == null) {
+ throw new NullPointerException("Scan must not be null.");
}
+ this.scan = scan;
+ }
- private void writeObject(ObjectOutputStream out) throws IOException {
- ProtobufUtil.toScan(scan).writeDelimitedTo(out);
- }
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ ProtobufUtil.toScan(scan).writeDelimitedTo(out);
+ }
- private void readObject(ObjectInputStream in) throws IOException {
- scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
- }
+ private void readObject(ObjectInputStream in) throws IOException {
+ scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in));
+ }
- public Scan get() {
- return scan;
- }
+ public Scan get() {
+ return scan;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
index 5b2e138..25369fc 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -26,9 +26,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for {@link HBaseCoderProviderRegistrar}.
- */
+/** Tests for {@link HBaseCoderProviderRegistrar}. */
@RunWith(JUnit4.class)
public class HBaseCoderProviderRegistrarTest {
@Test