You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/06/20 09:49:54 UTC
[1/3] beam git commit: [BEAM-2411] Make the write transform of
HBaseIO simpler
Repository: beam
Updated Branches:
refs/heads/master 2304972c5 -> eae0d05bd
[BEAM-2411] Make the write transform of HBaseIO simpler
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ec59a08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ec59a08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ec59a08
Branch: refs/heads/master
Commit: 1ec59a08a3fab5ac0918d7f1a33b82427957b630
Parents: 2304972
Author: Ismaël Mejía <ie...@apache.org>
Authored: Mon Jun 5 23:48:38 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 09:04:00 2017 +0200
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 45 +++++++-------------
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 37 +++++++---------
2 files changed, 32 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 849873c..626fad9 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -31,10 +31,7 @@ import java.util.Set;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.range.ByteKey;
@@ -44,7 +41,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -122,16 +118,15 @@ import org.slf4j.LoggerFactory;
* <h3>Writing to HBase</h3>
*
* <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<KV<byte[], Iterable<Mutation>>>}, where the
- * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
+ * {@link PCollection PCollection<Mutation>}, where each {@link Mutation} represents an
+ * idempotent transformation on a row.
*
* <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
* to identify the HBase instance, for example:
*
* <pre>{@code
* Configuration configuration = ...;
- * PCollection<KV<byte[], Iterable<Mutation>>> data = ...;
+ * PCollection<Mutation> data = ...;
* data.setCoder(HBaseIO.WRITE_CODER);
*
* data.apply("write",
@@ -545,9 +540,7 @@ public class HBaseIO {
*
* @see HBaseIO
*/
- public static class Write
- extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> {
-
+ public static class Write extends PTransform<PCollection<Mutation>, PDone> {
/**
* Returns a new {@link HBaseIO.Write} that will write to the HBase instance
* indicated by the given Configuration, and using any other specified customizations.
@@ -575,7 +568,7 @@ public class HBaseIO {
}
@Override
- public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) {
+ public PDone expand(PCollection<Mutation> input) {
input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
return PDone.in(input.getPipeline());
}
@@ -613,7 +606,7 @@ public class HBaseIO {
private final String tableId;
private final SerializableConfiguration serializableConfiguration;
- private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> {
+ private class HBaseWriterFn extends DoFn<Mutation, Void> {
public HBaseWriterFn(String tableId,
SerializableConfiguration serializableConfiguration) {
@@ -624,31 +617,27 @@ public class HBaseIO {
@Setup
public void setup() throws Exception {
- Configuration configuration = this.serializableConfiguration.get();
- connection = ConnectionFactory.createConnection(configuration);
+ connection = ConnectionFactory.createConnection(serializableConfiguration.get());
+ }
- TableName tableName = TableName.valueOf(tableId);
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
BufferedMutatorParams params =
- new BufferedMutatorParams(tableName);
+ new BufferedMutatorParams(TableName.valueOf(tableId));
mutator = connection.getBufferedMutator(params);
-
recordsWritten = 0;
}
@ProcessElement
- public void processElement(ProcessContext ctx) throws Exception {
- KV<byte[], Iterable<Mutation>> record = ctx.element();
- List<Mutation> mutations = new ArrayList<>();
- for (Mutation mutation : record.getValue()) {
- mutations.add(mutation);
- ++recordsWritten;
- }
- mutator.mutate(mutations);
+ public void processElement(ProcessContext c) throws Exception {
+ mutator.mutate(c.element());
+ ++recordsWritten;
}
@FinishBundle
public void finishBundle() throws Exception {
mutator.flush();
+ LOG.debug("Wrote {} records", recordsWritten);
}
@Teardown
@@ -661,7 +650,6 @@ public class HBaseIO {
connection.close();
connection = null;
}
- LOG.debug("Wrote {} records", recordsWritten);
}
@Override
@@ -679,6 +667,5 @@ public class HBaseIO {
}
}
- public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER =
- KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
+ public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 005770d..d081139 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -292,15 +291,17 @@ public class HBaseIOTest {
final String table = "table";
final String key = "key";
final String value = "value";
+ final int numMutations = 100;
createTable(table);
- p.apply("single row", Create.of(makeWrite(key, value)).withCoder(HBaseIO.WRITE_CODER))
- .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+ p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
+ .withCoder(HBaseIO.WRITE_CODER))
+ .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
p.run().waitUntilFinish();
List<Result> results = readTable(table, new Scan());
- assertEquals(1, results.size());
+ assertEquals(numMutations, results.size());
}
/** Tests that when writing to a non-existent table, the write fails. */
@@ -308,10 +309,8 @@ public class HBaseIOTest {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE-DOES-NOT-EXIST";
- PCollection<KV<byte[], Iterable<Mutation>>> emptyInput =
- p.apply(Create.empty(HBaseIO.WRITE_CODER));
-
- emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+ p.apply(Create.empty(HBaseIO.WRITE_CODER))
+ .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
// Exception will be thrown by write.validate() when write is applied.
thrown.expect(IllegalArgumentException.class);
@@ -326,7 +325,7 @@ public class HBaseIOTest {
final String key = "KEY";
createTable(table);
- p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER))
+ p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
.apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
thrown.expect(Pipeline.PipelineExecutionException.class);
@@ -405,26 +404,22 @@ public class HBaseIOTest {
// Beam helper methods
/** Helper function to make a single row mutation to be written. */
- private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
- byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
+ private static Iterable<Mutation> makeMutations(String key, String value, int numMutations) {
List<Mutation> mutations = new ArrayList<>();
- mutations.add(makeMutation(key, value));
- return KV.of(rowKey, (Iterable<Mutation>) mutations);
+ for (int i = 0; i < numMutations; i++) {
+ mutations.add(makeMutation(key + i, value));
+ }
+ return mutations;
}
-
private static Mutation makeMutation(String key, String value) {
- byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
- return new Put(rowKey)
+ return new Put(key.getBytes(StandardCharsets.UTF_8))
.addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
.addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com"));
}
- private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
- Put put = new Put(key.getBytes());
- List<Mutation> mutations = new ArrayList<>();
- mutations.add(put);
- return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations);
+ private static Mutation makeBadMutation(String key) {
+ return new Put(key.getBytes());
}
private void runReadTest(HBaseIO.Read read, List<Result> expected) {
[2/3] beam git commit: [BEAM-2411] Add HBaseCoderProviderRegistrar
for better coder inference
Posted by ie...@apache.org.
[BEAM-2411] Add HBaseCoderProviderRegistrar for better coder inference
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d42f6333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d42f6333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d42f6333
Branch: refs/heads/master
Commit: d42f6333141e85964d009110d8bea85ad4763632
Parents: 1ec59a0
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 10:00:15 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 10:00:15 2017 +0200
----------------------------------------------------------------------
sdks/java/io/hbase/pom.xml | 6 +++
.../io/hbase/HBaseCoderProviderRegistrar.java | 49 ++++++++++++++++++++
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 3 --
.../hbase/HBaseCoderProviderRegistrarTest.java | 41 ++++++++++++++++
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 9 ++--
5 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index f81cd24..4d9d600 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -64,6 +64,12 @@
</dependency>
<dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase.version}</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
new file mode 100644
index 0000000..dee3c70
--- /dev/null
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(TypeDescriptor.of(Append.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Delete.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Increment.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Mutation.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Put.class), HBaseMutationCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 626fad9..c9afe89 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -127,7 +127,6 @@ import org.slf4j.LoggerFactory;
* <pre>{@code
* Configuration configuration = ...;
* PCollection<Mutation> data = ...;
- * data.setCoder(HBaseIO.WRITE_CODER);
*
* data.apply("write",
* HBaseIO.write()
@@ -666,6 +665,4 @@ public class HBaseIO {
private long recordsWritten;
}
}
-
- public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..ac81e8a
--- /dev/null
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HBaseCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HBaseCoderProviderRegistrarTest {
+ @Test
+ public void testResultCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(Result.class);
+ }
+
+ @Test
+ public void testMutationCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(Mutation.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d42f6333/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index d081139..806a27f 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -295,8 +295,7 @@ public class HBaseIOTest {
createTable(table);
- p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations))
- .withCoder(HBaseIO.WRITE_CODER))
+ p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
p.run().waitUntilFinish();
@@ -309,7 +308,7 @@ public class HBaseIOTest {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE-DOES-NOT-EXIST";
- p.apply(Create.empty(HBaseIO.WRITE_CODER))
+ p.apply(Create.empty(HBaseMutationCoder.of()))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
// Exception will be thrown by write.validate() when write is applied.
@@ -325,8 +324,8 @@ public class HBaseIOTest {
final String key = "KEY";
createTable(table);
- p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER))
- .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+ p.apply(Create.of(makeBadMutation(key)))
+ .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
thrown.expect(Pipeline.PipelineExecutionException.class);
thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
[3/3] beam git commit: This closes #3391
Posted by ie...@apache.org.
This closes #3391
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eae0d05b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eae0d05b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eae0d05b
Branch: refs/heads/master
Commit: eae0d05bd7c088accd927dcfe3e511efbb11c9fd
Parents: 2304972 d42f633
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Jun 20 11:49:25 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Tue Jun 20 11:49:25 2017 +0200
----------------------------------------------------------------------
sdks/java/io/hbase/pom.xml | 6 +++
.../io/hbase/HBaseCoderProviderRegistrar.java | 49 ++++++++++++++++++++
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 46 ++++++------------
.../hbase/HBaseCoderProviderRegistrarTest.java | 41 ++++++++++++++++
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 38 +++++++--------
5 files changed, 127 insertions(+), 53 deletions(-)
----------------------------------------------------------------------