You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/10/03 07:42:17 UTC
[1/2] beam git commit: Fix code style issues for HBaseIO
Repository: beam
Updated Branches:
refs/heads/master 4a5b3c052 -> 81d304d4b
Fix code style issues for HBaseIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd39e7bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd39e7bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd39e7bd
Branch: refs/heads/master
Commit: bd39e7bdcfacfef5bb64e2132ba5d2abf50ce99b
Parents: 4a5b3c0
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Sat Sep 30 09:39:28 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Tue Oct 3 09:41:45 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 200 +++++++++----------
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 14 +-
2 files changed, 95 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bd39e7bd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 393402a..bcdaefa 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -159,69 +159,53 @@ public class HBaseIO {
return new Read(null, "", new SerializableScan(new Scan()));
}
- /**
- * A {@link PTransform} that reads from HBase. See the class-level Javadoc on
- {@link HBaseIO} for* more information.
- *
- * @see HBaseIO
- */
- public static class Read extends PTransform<PBegin, PCollection<Result>> {
- /**
- Reads from the HBase instance
- indicated by the* given configuration.*/
-
- public Read withConfiguration(Configuration configuration) {
- checkArgument(configuration != null, "configuration can not be null");
- return new Read(new SerializableConfiguration(configuration),
- tableId, serializableScan);
- }
-
- /**
- Reads from the specified table.*/
-
- public Read withTableId(String tableId) {
- checkArgument(tableId != null, "tableIdcan not be null");
- return new Read(serializableConfiguration, tableId, serializableScan);
- }
-
- /**
- Filters the rows read from HBase
- using the given* scan.*/
-
- public Read withScan(Scan scan) {
- checkArgument(scan != null, "scancan not be null");
- return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
- }
-
- /**
- Filters the rows read from HBase
- using the given* row filter.*/
+ /**
+ * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for*
+ * more information.
+ *
+ * @see HBaseIO
+ */
+ public static class Read extends PTransform<PBegin, PCollection<Result>> {
+ /** Reads from the HBase instance indicated by the* given configuration. */
+ public Read withConfiguration(Configuration configuration) {
+ checkArgument(configuration != null, "configuration can not be null");
+ return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
+ }
- public Read withFilter(Filter filter) {
- checkArgument(filter != null, "filtercan not be null");
- return withScan(serializableScan.get().setFilter(filter));
- }
+ /** Reads from the specified table. */
+ public Read withTableId(String tableId) {
+ checkArgument(tableId != null, "tableIdcan not be null");
+ return new Read(serializableConfiguration, tableId, serializableScan);
+ }
- /**
- Reads only rows in the specified range.*/
+ /** Filters the rows read from HBase using the given* scan. */
+ public Read withScan(Scan scan) {
+ checkArgument(scan != null, "scancan not be null");
+ return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
+ }
- public Read withKeyRange(ByteKeyRange keyRange) {
- checkArgument(keyRange != null, "keyRangecan not be null");
- byte[] startRow = keyRange.getStartKey().getBytes();
- byte[] stopRow = keyRange.getEndKey().getBytes();
- return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
- }
+ /** Filters the rows read from HBase using the given* row filter. */
+ public Read withFilter(Filter filter) {
+ checkArgument(filter != null, "filtercan not be null");
+ return withScan(serializableScan.get().setFilter(filter));
+ }
- /**
- Reads only rows in the specified range.*/
+ /** Reads only rows in the specified range. */
+ public Read withKeyRange(ByteKeyRange keyRange) {
+ checkArgument(keyRange != null, "keyRangecan not be null");
+ byte[] startRow = keyRange.getStartKey().getBytes();
+ byte[] stopRow = keyRange.getEndKey().getBytes();
+ return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
+ }
- public Read withKeyRange(byte[] startRow, byte[] stopRow) {
- checkArgument(startRow != null, "startRowcan not be null");
- checkArgument(stopRow != null, "stopRowcan not be null");
- ByteKeyRange keyRange =
- ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
- return withKeyRange(keyRange);
- }
+ /** Reads only rows in the specified range. */
+ public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+ checkArgument(startRow != null, "startRowcan not be null");
+ checkArgument(stopRow != null, "stopRowcan not be null");
+ ByteKeyRange keyRange =
+ ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+ return withKeyRange(keyRange);
+ }
private Read(
SerializableConfiguration serializableConfiguration,
@@ -232,22 +216,21 @@ public class HBaseIO {
this.serializableScan = serializableScan;
}
- @Override
- public PCollection<Result> expand(PBegin input) {
- checkArgument(serializableConfiguration != null,
- "withConfiguration() is required");
- checkArgument(!tableId.isEmpty(), "withTableId() is required");
- try (Connection connection = ConnectionFactory.createConnection(
- serializableConfiguration.get())) {
- Admin admin = connection.getAdmin();
- checkArgument(admin.tableExists(TableName.valueOf(tableId)),
- "Table %s does not exist", tableId);
- } catch (IOException e) {
- LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
- return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
- }
+ @Override
+ public PCollection<Result> expand(PBegin input) {
+ checkArgument(serializableConfiguration != null, "withConfiguration() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() is required");
+ try (Connection connection =
+ ConnectionFactory.createConnection(serializableConfiguration.get())) {
+ Admin admin = connection.getAdmin();
+ checkArgument(
+ admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
+ return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+ }
@Override
public void populateDisplayData(DisplayData.Builder builder) {
@@ -597,50 +580,45 @@ public class HBaseIO {
return new Write(null /* SerializableConfiguration */, "");
}
- /**
- * A {@link PTransform} that writes to HBase. See the class-level Javadoc on
- {@link HBaseIO} for* more information.
- *
- * @see HBaseIO
- */
- public static class Write extends PTransform<PCollection<Mutation>, PDone> {
- /**
- Writes to the HBase instance
- indicated by the* given Configuration.
- */
- public Write withConfiguration(Configuration configuration) {
- checkArgument(configuration != null, "configuration can not be null");
- return new Write(new SerializableConfiguration(configuration), tableId);
- }
-
- /**
- Writes to the specified table.*/
+ /**
+ * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for*
+ * more information.
+ *
+ * @see HBaseIO
+ */
+ public static class Write extends PTransform<PCollection<Mutation>, PDone> {
+ /** Writes to the HBase instance indicated by the* given Configuration. */
+ public Write withConfiguration(Configuration configuration) {
+ checkArgument(configuration != null, "configuration can not be null");
+ return new Write(new SerializableConfiguration(configuration), tableId);
+ }
- public Write withTableId(String tableId) {
- checkArgument(tableId != null, "tableIdcan not be null");
- return new Write(serializableConfiguration, tableId);
- }
+ /** Writes to the specified table. */
+ public Write withTableId(String tableId) {
+ checkArgument(tableId != null, "tableIdcan not be null");
+ return new Write(serializableConfiguration, tableId);
+ }
private Write(SerializableConfiguration serializableConfiguration, String tableId) {
this.serializableConfiguration = serializableConfiguration;
this.tableId = tableId;
}
- @Override
- public PDone expand(PCollection<Mutation> input) {
- checkArgument(serializableConfiguration != null, "withConfiguration() is required");
- checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
- try (Connection connection = ConnectionFactory.createConnection(
- serializableConfiguration.get())) {
- Admin admin = connection.getAdmin();
- checkArgument(admin.tableExists(TableName.valueOf(tableId)),
- "Table %s does not exist", tableId);
- } catch (IOException e) {
- LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
- return PDone.in(input.getPipeline());
- }
+ @Override
+ public PDone expand(PCollection<Mutation> input) {
+ checkArgument(serializableConfiguration != null, "withConfiguration() is required");
+ checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
+ try (Connection connection =
+ ConnectionFactory.createConnection(serializableConfiguration.get())) {
+ Admin admin = connection.getAdmin();
+ checkArgument(
+ admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
+ return PDone.in(input.getPipeline());
+ }
@Override
public void populateDisplayData(DisplayData.Builder builder) {
http://git-wip-us.apache.org/repos/asf/beam/blob/bd39e7bd/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 73ba64b..fd42024 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -355,14 +355,12 @@ public class HBaseIOTest {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE-DOES-NOT-EXIST";
-
-
- // Exception will be thrown by write.expand() when write is applied.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(String.format("Table %s does not exist", table));
- p.apply(Create.empty(HBaseMutationCoder.of()))
- .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
- }
+ // Exception will be thrown by write.expand() when write is applied.
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(String.format("Table %s does not exist", table));
+ p.apply(Create.empty(HBaseMutationCoder.of()))
+ .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+ }
/** Tests that when writing an element fails, the write fails. */
@Test
[2/2] beam git commit: This closes #3932
Posted by ie...@apache.org.
This closes #3932
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81d304d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81d304d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81d304d4
Branch: refs/heads/master
Commit: 81d304d4b79aab3d1f6102f244ecec8caf9cb2e8
Parents: 4a5b3c0 bd39e7b
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Tue Oct 3 09:41:57 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Tue Oct 3 09:41:57 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 200 +++++++++----------
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 14 +-
2 files changed, 95 insertions(+), 119 deletions(-)
----------------------------------------------------------------------