You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/20 09:49:55 UTC

[2/3] beam git commit: [BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference

[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d42f6333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d42f6333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d42f6333

Branch: refs/heads/master
Commit: d42f6333141e85964d009110d8bea85ad4763632
Parents: 1ec59a0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 10:00:15 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 10:00:15 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/hbase/pom.xml                      |  6 +++
 .../io/hbase/HBaseCoderProviderRegistrar.java   | 49 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |  3 --
 .../hbase/HBaseCoderProviderRegistrarTest.java  | 41 ++++++++++++++++
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java   |  9 ++--
 5 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index f81cd24..4d9d600 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -64,6 +64,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-shaded-client</artifactId>
       <version>${hbase.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
new file mode 100644
index 0000000..dee3c70
--- /dev/null
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+      CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()),
+      CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 626fad9..c9afe89 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code
  * Configuration configuration = ...;
  * PCollection<Mutation> data = ...;
- * data.setCoder(HBaseIO.WRITE_CODER);
  *
  * data.apply("write",
  *     HBaseIO.write()
@@ -666,6 +665,4 @@ public class HBaseIO {
             private long recordsWritten;
         }
     }
-
-    public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..ac81e8a
--- /dev/null
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HBaseCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HBaseCoderProviderRegistrarTest {
+  @Test
+  public void testResultCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(Result.class);
+  }
+
+  @Test
+  public void testMutationCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(Mutation.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index d081139..806a27f 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -295,8 +295,7 @@ public class HBaseIOTest {
 
         createTable(table);
 
-        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
-            .withCoder(HBaseIO.WRITE_CODER))
+        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)))
          .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
         p.run().waitUntilFinish();
 
@@ -309,7 +308,7 @@ public class HBaseIOTest {
     public void testWritingFailsTableDoesNotExist() throws Exception {
         final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-        p.apply(Create.empty(HBaseIO.WRITE_CODER))
+        p.apply(Create.empty(HBaseMutationCoder.of()))
          .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         // Exception will be thrown by write.validate() when write is applied.
@@ -325,8 +324,8 @@ public class HBaseIOTest {
         final String key = "KEY";
         createTable(table);
 
-        p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
-                .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply(Create.of(makeBadMutation(key)))
+         .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         thrown.expect(Pipeline.PipelineExecutionException.class);
         thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));