You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/20 09:49:54 UTC

[1/3] beam git commit: [BEAM-2411] Make the write transform of HBaseIO simpler

Repository: beam
Updated Branches:
  refs/heads/master 2304972c5 -> eae0d05bd


[BEAM-2411] Make the write transform of HBaseIO simpler


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ec59a08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ec59a08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ec59a08

Branch: refs/heads/master
Commit: 1ec59a08a3fab5ac0918d7f1a33b82427957b630
Parents: 2304972
Author: Ismaël Mejía <ie...@apache.org>
Authored: Mon Jun 5 23:48:38 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 09:04:00 2017 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 45 +++++++-------------
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 37 +++++++---------
 2 files changed, 32 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 849873c..626fad9 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -31,10 +31,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.range.ByteKey;
@@ -44,7 +41,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -122,16 +118,15 @@ import org.slf4j.LoggerFactory;
  * <h3>Writing to HBase</h3>
  *
  * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;KV&lt;byte[], Iterable&lt;Mutation&gt;&gt;&gt;}, where the
- * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
+ * {@link PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an
+ * idempotent transformation on a row.
  *
  * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
  * to identify the HBase instance, for example:
  *
  * <pre>{@code
  * Configuration configuration = ...;
- * PCollection<KV<byte[], Iterable<Mutation>>> data = ...;
+ * PCollection<Mutation> data = ...;
  * data.setCoder(HBaseIO.WRITE_CODER);
  *
  * data.apply("write",
@@ -545,9 +540,7 @@ public class HBaseIO {
      *
      * @see HBaseIO
      */
-    public static class Write
-            extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> {
-
+    public static class Write extends PTransform<PCollection<Mutation>, PDone> {
         /**
          * Returns a new {@link HBaseIO.Write} that will write to the HBase instance
          * indicated by the given Configuration, and using any other specified customizations.
@@ -575,7 +568,7 @@ public class HBaseIO {
         }
 
         @Override
-        public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) {
+        public PDone expand(PCollection<Mutation> input) {
             input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
             return PDone.in(input.getPipeline());
         }
@@ -613,7 +606,7 @@ public class HBaseIO {
         private final String tableId;
         private final SerializableConfiguration serializableConfiguration;
 
-        private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> {
+        private class HBaseWriterFn extends DoFn<Mutation, Void> {
 
             public HBaseWriterFn(String tableId,
                                  SerializableConfiguration serializableConfiguration) {
@@ -624,31 +617,27 @@ public class HBaseIO {
 
             @Setup
             public void setup() throws Exception {
-                Configuration configuration = this.serializableConfiguration.get();
-                connection = ConnectionFactory.createConnection(configuration);
+                connection = ConnectionFactory.createConnection(serializableConfiguration.get());
+            }
 
-                TableName tableName = TableName.valueOf(tableId);
+            @StartBundle
+            public void startBundle(StartBundleContext c) throws IOException {
                 BufferedMutatorParams params =
-                    new BufferedMutatorParams(tableName);
+                    new BufferedMutatorParams(TableName.valueOf(tableId));
                 mutator = connection.getBufferedMutator(params);
-
                 recordsWritten = 0;
             }
 
             @ProcessElement
-            public void processElement(ProcessContext ctx) throws Exception {
-                KV<byte[], Iterable<Mutation>> record = ctx.element();
-                List<Mutation> mutations = new ArrayList<>();
-                for (Mutation mutation : record.getValue()) {
-                    mutations.add(mutation);
-                    ++recordsWritten;
-                }
-                mutator.mutate(mutations);
+            public void processElement(ProcessContext c) throws Exception {
+                mutator.mutate(c.element());
+                ++recordsWritten;
             }
 
             @FinishBundle
             public void finishBundle() throws Exception {
                 mutator.flush();
+                LOG.debug("Wrote {} records", recordsWritten);
             }
 
             @Teardown
@@ -661,7 +650,6 @@ public class HBaseIO {
                     connection.close();
                     connection = null;
                 }
-                LOG.debug("Wrote {} records", recordsWritten);
             }
 
             @Override
@@ -679,6 +667,5 @@ public class HBaseIO {
         }
     }
 
-    public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER =
-            KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
+    public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 005770d..d081139 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -292,15 +291,17 @@ public class HBaseIOTest {
         final String table = "table";
         final String key = "key";
         final String value = "value";
+        final int numMutations = 100;
 
         createTable(table);
 
-        p.apply("single row", Create.of(makeWrite(key, value)).withCoder(HBaseIO.WRITE_CODER))
-                .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
+            .withCoder(HBaseIO.WRITE_CODER))
+         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
         p.run().waitUntilFinish();
 
         List<Result> results = readTable(table, new Scan());
-        assertEquals(1, results.size());
+        assertEquals(numMutations, results.size());
     }
 
     /** Tests that when writing to a non-existent table, the write fails. */
@@ -308,10 +309,8 @@ public class HBaseIOTest {
     public void testWritingFailsTableDoesNotExist() throws Exception {
         final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-        PCollection<KV<byte[], Iterable<Mutation>>> emptyInput =
-                p.apply(Create.empty(HBaseIO.WRITE_CODER));
-
-        emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply(Create.empty(HBaseIO.WRITE_CODER))
+         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         // Exception will be thrown by write.validate() when write is applied.
         thrown.expect(IllegalArgumentException.class);
@@ -326,7 +325,7 @@ public class HBaseIOTest {
         final String key = "KEY";
         createTable(table);
 
-        p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER))
+        p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
                 .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         thrown.expect(Pipeline.PipelineExecutionException.class);
@@ -405,26 +404,22 @@ public class HBaseIOTest {
 
     // Beam helper methods
     /** Helper function to make a single row mutation to be written. */
-    private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
-        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
+    private static Iterable<Mutation> makeMutations(String key, String value, int numMutations) {
         List<Mutation> mutations = new ArrayList<>();
-        mutations.add(makeMutation(key, value));
-        return KV.of(rowKey, (Iterable<Mutation>) mutations);
+        for (int i = 0; i < numMutations; i++) {
+            mutations.add(makeMutation(key + i, value));
+        }
+        return mutations;
     }
 
-
     private static Mutation makeMutation(String key, String value) {
-        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
-        return new Put(rowKey)
+        return new Put(key.getBytes(StandardCharsets.UTF_8))
                     .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
                     .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com"));
     }
 
-    private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
-        Put put = new Put(key.getBytes());
-        List<Mutation> mutations = new ArrayList<>();
-        mutations.add(put);
-        return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations);
+    private static Mutation makeBadMutation(String key) {
+        return new Put(key.getBytes());
     }
 
     private void runReadTest(HBaseIO.Read read, List<Result> expected) {


[2/3] beam git commit: [BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference

Posted by ie...@apache.org.
[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d42f6333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d42f6333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d42f6333

Branch: refs/heads/master
Commit: d42f6333141e85964d009110d8bea85ad4763632
Parents: 1ec59a0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 10:00:15 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 10:00:15 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/hbase/pom.xml                      |  6 +++
 .../io/hbase/HBaseCoderProviderRegistrar.java   | 49 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |  3 --
 .../hbase/HBaseCoderProviderRegistrarTest.java  | 41 ++++++++++++++++
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  9 ++--
 5 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index f81cd24..4d9d600 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -64,6 +64,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-shaded-client</artifactId>
       <version>${hbase.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
new file mode 100644
index 0000000..dee3c70
--- /dev/null
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+      CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 626fad9..c9afe89 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code
  * Configuration configuration = ...;
  * PCollection<Mutation> data = ...;
- * data.setCoder(HBaseIO.WRITE_CODER);
  *
  * data.apply("write",
  *     HBaseIO.write()
@@ -666,6 +665,4 @@ public class HBaseIO {
             private long recordsWritten;
         }
     }
-
-    public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..ac81e8a
--- /dev/null
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HBaseCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HBaseCoderProviderRegistrarTest {
+  @Test
+  public void testResultCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(Result.class);
+  }
+
+  @Test
+  public void testMutationCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(Mutation.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index d081139..806a27f 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -295,8 +295,7 @@ public class HBaseIOTest {
 
         createTable(table);
 
-        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
-            .withCoder(HBaseIO.WRITE_CODER))
+        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)))
          .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
         p.run().waitUntilFinish();
 
@@ -309,7 +308,7 @@ public class HBaseIOTest {
     public void testWritingFailsTableDoesNotExist() throws Exception {
         final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-        p.apply(Create.empty(HBaseIO.WRITE_CODER))
+        p.apply(Create.empty(HBaseMutationCoder.of()))
          .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         // Exception will be thrown by write.validate() when write is applied.
@@ -325,8 +324,8 @@ public class HBaseIOTest {
         final String key = "KEY";
         createTable(table);
 
-        p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
-                .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply(Create.of(makeBadMutation(key)))
+         .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         thrown.expect(Pipeline.PipelineExecutionException.class);
         thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));


[3/3] beam git commit: This closes #3391

Posted by ie...@apache.org.
This closes #3391


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eae0d05b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eae0d05b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eae0d05b

Branch: refs/heads/master
Commit: eae0d05bd7c088accd927dcfe3e511efbb11c9fd
Parents: 2304972 d42f633
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 11:49:25 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 11:49:25 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/hbase/pom.xml                      |  6 +++
 .../io/hbase/HBaseCoderProviderRegistrar.java   | 49 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 46 ++++++------------
 .../hbase/HBaseCoderProviderRegistrarTest.java  | 41 ++++++++++++++++
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   | 38 +++++++--------
 5 files changed, 127 insertions(+), 53 deletions(-)
----------------------------------------------------------------------