You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/20 09:49:55 UTC
[2/3] beam git commit: [BEAM-2411] Add HBaseCoderProviderRegistrar
for better coder inference
[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d42f6333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d42f6333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d42f6333
Branch: refs/heads/master
Commit: d42f6333141e85964d009110d8bea85ad4763632
Parents: 1ec59a0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 10:00:15 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 10:00:15 2017 +0200
----------------------------------------------------------------------
sdks/java/io/hbase/pom.xml | 6 +++
.../io/hbase/HBaseCoderProviderRegistrar.java | 49 ++++++++++++++++++++
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 3 --
.../hbase/HBaseCoderProviderRegistrarTest.java | 41 ++++++++++++++++
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 9 ++--
5 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index f81cd24..4d9d600 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -64,6 +64,12 @@
</dependency>
<dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase.version}</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
new file mode 100644
index 0000000..dee3c70
--- /dev/null
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 626fad9..c9afe89 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory;
* <pre>{@code
* Configuration configuration = ...;
* PCollection<Mutation> data = ...;
- * data.setCoder(HBaseIO.WRITE_CODER);
*
* data.apply("write",
* HBaseIO.write()
@@ -666,6 +665,4 @@ public class HBaseIO {
private long recordsWritten;
}
}
-
- public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..ac81e8a
--- /dev/null
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HBaseCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HBaseCoderProviderRegistrarTest {
+ @Test
+ public void testResultCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(Result.class);
+ }
+
+ @Test
+ public void testMutationCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(Mutation.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index d081139..806a27f 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -295,8 +295,7 @@ public class HBaseIOTest {
createTable(table);
- p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
- .withCoder(HBaseIO.WRITE_CODER))
+ p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
p.run().waitUntilFinish();
@@ -309,7 +308,7 @@ public class HBaseIOTest {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE-DOES-NOT-EXIST";
- p.apply(Create.empty(HBaseIO.WRITE_CODER))
+ p.apply(Create.empty(HBaseMutationCoder.of()))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
// Exception will be thrown by write.validate() when write is applied.
@@ -325,8 +324,8 @@ public class HBaseIOTest {
final String key = "KEY";
createTable(table);
- p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
- .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+ p.apply(Create.of(makeBadMutation(key)))
+ .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
thrown.expect(Pipeline.PipelineExecutionException.class);
thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));