You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/27 19:36:32 UTC
[1/2] incubator-beam git commit: Closes #505
Repository: incubator-beam
Updated Branches:
refs/heads/master 05a1d20b2 -> fc803f60e
Closes #505
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc803f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc803f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc803f60
Branch: refs/heads/master
Commit: fc803f60ef2d703adfa80ebef60108a92ad9cc12
Parents: 05a1d20 76173da
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jun 27 12:36:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jun 27 12:36:22 2016 -0700
----------------------------------------------------------------------
sdks/java/io/google-cloud-platform/pom.xml | 47 +++++
.../io/gcp/bigtable/BigtableTestOptions.java | 42 ++++
.../sdk/io/gcp/bigtable/BigtableReadIT.java | 61 ++++++
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 197 +++++++++++++++++++
4 files changed, 347 insertions(+)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Added integration tests for
BigtableRead and BigtableWrite
Posted by dh...@apache.org.
Added integration tests for BigtableRead and BigtableWrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76173da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76173da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76173da2
Branch: refs/heads/master
Commit: 76173da215dc9878b9fa49cfe9a77541e2adf4be
Parents: 05a1d20
Author: Ian Zhou <ia...@google.com>
Authored: Fri Jun 17 14:21:05 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jun 27 12:36:22 2016 -0700
----------------------------------------------------------------------
sdks/java/io/google-cloud-platform/pom.xml | 47 +++++
.../io/gcp/bigtable/BigtableTestOptions.java | 42 ++++
.../sdk/io/gcp/bigtable/BigtableReadIT.java | 61 ++++++
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 197 +++++++++++++++++++
4 files changed, 347 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 5786e84..bb5fd11 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -53,7 +53,39 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
+
+ <!-- Integration Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <useManifestOnlyJar>false</useManifestOnlyJar>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
+
+ <extensions>
+ <!-- Use os-maven-plugin to initialize the "os.detected" properties -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.4.0.Final</version>
+ </extension>
+ </extensions>
</build>
<dependencies>
@@ -99,6 +131,14 @@
<artifactId>jsr305</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>1.1.33.Fork13</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>runtime</scope>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.apache.beam</groupId>
@@ -115,6 +155,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
new file mode 100644
index 0000000..0cd4f57
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * Properties needed when using Bigtable with the Beam SDK.
+ */
+public interface BigtableTestOptions extends TestPipelineOptions {
+ @Description("Project ID for Bigtable")
+ @Default.String("apache-beam-testing")
+ String getProjectId();
+ void setProjectId(String value);
+
+ @Description("Cluster ID for Bigtable")
+ @Default.String("beam-test")
+ String getClusterId();
+ void setClusterId(String value);
+
+ @Description("Zone ID for Bigtable")
+ @Default.String("us-central1-c")
+ String getZoneId();
+ void setZoneId(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
new file mode 100644
index 0000000..22d5b5b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.bigtable.v1.Row;
+import com.google.cloud.bigtable.config.BigtableOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of BigtableRead.
+ */
+@RunWith(JUnit4.class)
+public class BigtableReadIT {
+
+ @Test
+ public void testE2EBigtableRead() throws Exception {
+ PipelineOptionsFactory.register(BigtableTestOptions.class);
+ BigtableTestOptions options = TestPipeline.testingPipelineOptions()
+ .as(BigtableTestOptions.class);
+
+ BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
+ .setProjectId(options.getProjectId())
+ .setClusterId(options.getClusterId())
+ .setZoneId(options.getZoneId());
+
+ final String tableId = "BigtableReadTest";
+ final long numRows = 1000L;
+
+ Pipeline p = Pipeline.create(options);
+ PCollection<Long> count = p
+ .apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
+ .apply(Count.<Row>globally());
+ PAssert.thatSingleton(count).isEqualTo(numRows);
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
new file mode 100644
index 0000000..af7afc5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.admin.table.v1.ColumnFamily;
+import com.google.bigtable.admin.table.v1.CreateTableRequest;
+import com.google.bigtable.admin.table.v1.DeleteTableRequest;
+import com.google.bigtable.admin.table.v1.GetTableRequest;
+import com.google.bigtable.admin.table.v1.Table;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End-to-end tests of BigtableWrite.
+ */
+@RunWith(JUnit4.class)
+public class BigtableWriteIT implements Serializable {
+ /**
+ * These tests requires a static instances because the writers go through a serialization step
+ * when executing the test and would not affect passed-in objects otherwise.
+ */
+ private static final String COLUMN_FAMILY_NAME = "cf";
+ private static BigtableTestOptions options;
+ private BigtableOptions bigtableOptions;
+ private static BigtableSession session;
+ private static BigtableTableAdminClient tableAdminClient;
+ private final String tableId =
+ String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());
+
+ @Before
+ public void setup() throws Exception {
+ PipelineOptionsFactory.register(BigtableTestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
+
+ BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
+ .setProjectId(options.getProjectId())
+ .setClusterId(options.getClusterId())
+ .setZoneId(options.getZoneId())
+ .setUserAgent("apache-beam-test");
+ bigtableOptions = bigtableOptionsBuilder.build();
+
+ session = new BigtableSession(bigtableOptions);
+ tableAdminClient = session.getTableAdminClient();
+ }
+
+ @Test
+ public void testE2EBigtableWrite() throws Exception {
+ final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+ final String clusterName = bigtableOptions.getClusterName().toString();
+ final int numRows = 1000;
+ final List<KV<ByteString, ByteString>> testData = generateTableData(numRows);
+
+ createEmptyTable(clusterName, tableId);
+
+ Pipeline p = Pipeline.create(options);
+ p.apply(CountingInput.upTo(numRows))
+ .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ int index = c.element().intValue();
+
+ Iterable<Mutation> mutations =
+ ImmutableList.of(Mutation.newBuilder()
+ .setSetCell(
+ Mutation.SetCell.newBuilder()
+ .setValue(testData.get(index).getValue())
+ .setFamilyName(COLUMN_FAMILY_NAME))
+ .build());
+ c.output(KV.of(testData.get(index).getKey(), mutations));
+ }
+ }))
+ .apply(BigtableIO.write()
+ .withBigtableOptions(bigtableOptions)
+ .withTableId(tableId));
+ p.run();
+
+ // Test number of column families and column family name equality
+ Table table = getTable(tableName);
+ assertThat(table.getColumnFamilies().keySet(), Matchers.hasSize(1));
+ assertThat(table.getColumnFamilies(), Matchers.hasKey(COLUMN_FAMILY_NAME));
+
+ // Test table data equality
+ List<KV<ByteString, ByteString>> tableData = getTableData(tableName);
+ assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray()));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+ deleteTable(tableName);
+ session.close();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ /** Helper function to generate KV test data. */
+ private List<KV<ByteString, ByteString>> generateTableData(int numRows) {
+ List<KV<ByteString, ByteString>> testData = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
+ ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
+ testData.add(KV.of(key, value));
+ }
+
+ return testData;
+ }
+
+ /** Helper function to create an empty table. */
+ private void createEmptyTable(String clusterName, String tableId) {
+ Table.Builder tableBuilder = Table.newBuilder();
+ Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies();
+ columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
+
+ CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder()
+ .setName(clusterName)
+ .setTableId(tableId)
+ .setTable(tableBuilder.build());
+ tableAdminClient.createTable(createTableRequestBuilder.build());
+ }
+
+ /** Helper function to get a table. */
+ private Table getTable(String tableName) {
+ GetTableRequest.Builder getTableRequestBuilder = GetTableRequest.newBuilder()
+ .setName(tableName);
+ return tableAdminClient.getTable(getTableRequestBuilder.build());
+ }
+
+ /** Helper function to get a table's data. */
+ private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException {
+ List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
+ ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
+ .setTableName(tableName);
+ ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
+
+ Row currentRow;
+ while ((currentRow = scanner.next()) != null) {
+ ByteString key = currentRow.getKey();
+ ByteString value = currentRow.getFamilies(0).getColumns(0).getCells(0).getValue();
+ tableData.add(KV.of(key, value));
+ }
+ scanner.close();
+
+ return tableData;
+ }
+
+ /** Helper function to delete a table. */
+ private void deleteTable(String tableName) {
+ DeleteTableRequest.Builder deleteTableRequestBuilder = DeleteTableRequest.newBuilder()
+ .setName(tableName);
+ tableAdminClient.deleteTable(deleteTableRequestBuilder.build());
+ }
+}