You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/10/03 07:42:17 UTC

[1/2] beam git commit: Fix code style issues for HBaseIO

Repository: beam
Updated Branches:
  refs/heads/master 4a5b3c052 -> 81d304d4b


Fix code style issues for HBaseIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd39e7bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd39e7bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd39e7bd

Branch: refs/heads/master
Commit: bd39e7bdcfacfef5bb64e2132ba5d2abf50ce99b
Parents: 4a5b3c0
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Sat Sep 30 09:39:28 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Tue Oct 3 09:41:45 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 200 +++++++++----------
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  14 +-
 2 files changed, 95 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd39e7bd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 393402a..bcdaefa 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -159,69 +159,53 @@ public class HBaseIO {
     return new Read(null, "", new SerializableScan(new Scan()));
   }
 
-    /**
-     * A {@link PTransform} that reads from HBase. See the class-level Javadoc on
-      {@link HBaseIO} for* more information.
-     *
-     * @see HBaseIO
-     */
-    public static class Read extends PTransform<PBegin, PCollection<Result>> {
-        /**
-         Reads from the HBase instance
-          indicated by the* given configuration.*/
-
-        public Read withConfiguration(Configuration configuration) {
-            checkArgument(configuration != null, "configuration can not be null");
-            return new Read(new SerializableConfiguration(configuration),
-                    tableId, serializableScan);
-        }
-
-        /**
-         Reads from the specified table.*/
-
-        public Read withTableId(String tableId) {
-            checkArgument(tableId != null, "tableIdcan not be null");
-            return new Read(serializableConfiguration, tableId, serializableScan);
-        }
-
-        /**
-         Filters the rows read from HBase
-          using the given* scan.*/
-
-        public Read withScan(Scan scan) {
-            checkArgument(scan != null, "scancan not be null");
-            return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
-        }
-
-        /**
-         Filters the rows read from HBase
-          using the given* row filter.*/
+  /**
+   * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for*
+   * more information.
+   *
+   * @see HBaseIO
+   */
+  public static class Read extends PTransform<PBegin, PCollection<Result>> {
+    /** Reads from the HBase instance indicated by the* given configuration. */
+    public Read withConfiguration(Configuration configuration) {
+      checkArgument(configuration != null, "configuration can not be null");
+      return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
+    }
 
-        public Read withFilter(Filter filter) {
-            checkArgument(filter != null, "filtercan not be null");
-            return withScan(serializableScan.get().setFilter(filter));
-        }
+    /** Reads from the specified table. */
+    public Read withTableId(String tableId) {
+      checkArgument(tableId != null, "tableIdcan not be null");
+      return new Read(serializableConfiguration, tableId, serializableScan);
+    }
 
-        /**
-         Reads only rows in the specified range.*/
+    /** Filters the rows read from HBase using the given* scan. */
+    public Read withScan(Scan scan) {
+      checkArgument(scan != null, "scancan not be null");
+      return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
+    }
 
-        public Read withKeyRange(ByteKeyRange keyRange) {
-            checkArgument(keyRange != null, "keyRangecan not be null");
-            byte[] startRow = keyRange.getStartKey().getBytes();
-            byte[] stopRow = keyRange.getEndKey().getBytes();
-            return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
-        }
+    /** Filters the rows read from HBase using the given* row filter. */
+    public Read withFilter(Filter filter) {
+      checkArgument(filter != null, "filtercan not be null");
+      return withScan(serializableScan.get().setFilter(filter));
+    }
 
-        /**
-         Reads only rows in the specified range.*/
+    /** Reads only rows in the specified range. */
+    public Read withKeyRange(ByteKeyRange keyRange) {
+      checkArgument(keyRange != null, "keyRangecan not be null");
+      byte[] startRow = keyRange.getStartKey().getBytes();
+      byte[] stopRow = keyRange.getEndKey().getBytes();
+      return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
+    }
 
-        public Read withKeyRange(byte[] startRow, byte[] stopRow) {
-            checkArgument(startRow != null, "startRowcan not be null");
-            checkArgument(stopRow != null, "stopRowcan not be null");
-            ByteKeyRange keyRange =
-                    ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
-            return withKeyRange(keyRange);
-        }
+    /** Reads only rows in the specified range. */
+    public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+      checkArgument(startRow != null, "startRowcan not be null");
+      checkArgument(stopRow != null, "stopRowcan not be null");
+      ByteKeyRange keyRange =
+          ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+      return withKeyRange(keyRange);
+    }
 
     private Read(
         SerializableConfiguration serializableConfiguration,
@@ -232,22 +216,21 @@ public class HBaseIO {
       this.serializableScan = serializableScan;
     }
 
-        @Override
-        public PCollection<Result> expand(PBegin input) {
-            checkArgument(serializableConfiguration != null,
-                    "withConfiguration() is required");
-            checkArgument(!tableId.isEmpty(), "withTableId() is required");
-            try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.get())) {
-                Admin admin = connection.getAdmin();
-                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
-                        "Table %s does not exist", tableId);
-            } catch (IOException e) {
-                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-            }
-            HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
-                return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
-        }
+    @Override
+    public PCollection<Result> expand(PBegin input) {
+      checkArgument(serializableConfiguration != null, "withConfiguration() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() is required");
+      try (Connection connection =
+          ConnectionFactory.createConnection(serializableConfiguration.get())) {
+        Admin admin = connection.getAdmin();
+        checkArgument(
+            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+      HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
+      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+    }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
@@ -597,50 +580,45 @@ public class HBaseIO {
     return new Write(null /* SerializableConfiguration */, "");
   }
 
-    /**
-     * A {@link PTransform} that writes to HBase. See the class-level Javadoc on
-      {@link HBaseIO} for* more information.
-     *
-     * @see HBaseIO
-     */
-    public static class Write extends PTransform<PCollection<Mutation>, PDone> {
-        /**
-         Writes to the HBase instance
-          indicated by the* given Configuration.
-         */
-        public Write withConfiguration(Configuration configuration) {
-            checkArgument(configuration != null, "configuration can not be null");
-            return new Write(new SerializableConfiguration(configuration), tableId);
-        }
-
-        /**
-         Writes to the specified table.*/
+  /**
+   * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for*
+   * more information.
+   *
+   * @see HBaseIO
+   */
+  public static class Write extends PTransform<PCollection<Mutation>, PDone> {
+    /** Writes to the HBase instance indicated by the* given Configuration. */
+    public Write withConfiguration(Configuration configuration) {
+      checkArgument(configuration != null, "configuration can not be null");
+      return new Write(new SerializableConfiguration(configuration), tableId);
+    }
 
-        public Write withTableId(String tableId) {
-            checkArgument(tableId != null, "tableIdcan not be null");
-            return new Write(serializableConfiguration, tableId);
-        }
+    /** Writes to the specified table. */
+    public Write withTableId(String tableId) {
+      checkArgument(tableId != null, "tableIdcan not be null");
+      return new Write(serializableConfiguration, tableId);
+    }
 
     private Write(SerializableConfiguration serializableConfiguration, String tableId) {
       this.serializableConfiguration = serializableConfiguration;
       this.tableId = tableId;
     }
 
-        @Override
-        public PDone expand(PCollection<Mutation> input) {
-            checkArgument(serializableConfiguration != null, "withConfiguration() is required");
-            checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
-            try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.get())) {
-                Admin admin = connection.getAdmin();
-                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
-                        "Table %s does not exist", tableId);
-            } catch (IOException e) {
-                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-            }
-            input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
-            return PDone.in(input.getPipeline());
-        }
+    @Override
+    public PDone expand(PCollection<Mutation> input) {
+      checkArgument(serializableConfiguration != null, "withConfiguration() is required");
+      checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
+      try (Connection connection =
+          ConnectionFactory.createConnection(serializableConfiguration.get())) {
+        Admin admin = connection.getAdmin();
+        checkArgument(
+            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+      input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
+      return PDone.in(input.getPipeline());
+    }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {

http://git-wip-us.apache.org/repos/asf/beam/blob/bd39e7bd/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 73ba64b..fd42024 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -355,14 +355,12 @@ public class HBaseIOTest {
   public void testWritingFailsTableDoesNotExist() throws Exception {
     final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-
-
-        // Exception will be thrown by write.expand() when write is applied.
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage(String.format("Table %s does not exist", table));
-        p.apply(Create.empty(HBaseMutationCoder.of()))
-         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
-    }
+    // Exception will be thrown by write.expand() when write is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+    p.apply(Create.empty(HBaseMutationCoder.of()))
+        .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+  }
 
   /** Tests that when writing an element fails, the write fails. */
   @Test


[2/2] beam git commit: This closes #3932

Posted by ie...@apache.org.
This closes #3932


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81d304d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81d304d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81d304d4

Branch: refs/heads/master
Commit: 81d304d4b79aab3d1f6102f244ecec8caf9cb2e8
Parents: 4a5b3c0 bd39e7b
Author: Ismaël Mejía <ie...@gmail.com>
Authored: Tue Oct 3 09:41:57 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Tue Oct 3 09:41:57 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 200 +++++++++----------
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  14 +-
 2 files changed, 95 insertions(+), 119 deletions(-)
----------------------------------------------------------------------