You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/27 19:36:32 UTC

[1/2] incubator-beam git commit: Closes #505

Repository: incubator-beam
Updated Branches:
  refs/heads/master 05a1d20b2 -> fc803f60e


Closes #505


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc803f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc803f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc803f60

Branch: refs/heads/master
Commit: fc803f60ef2d703adfa80ebef60108a92ad9cc12
Parents: 05a1d20 76173da
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jun 27 12:36:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jun 27 12:36:22 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/google-cloud-platform/pom.xml      |  47 +++++
 .../io/gcp/bigtable/BigtableTestOptions.java    |  42 ++++
 .../sdk/io/gcp/bigtable/BigtableReadIT.java     |  61 ++++++
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    | 197 +++++++++++++++++++
 4 files changed, 347 insertions(+)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Added integration tests for BigtableRead and BigtableWrite

Posted by dh...@apache.org.
Added integration tests for BigtableRead and BigtableWrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76173da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76173da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76173da2

Branch: refs/heads/master
Commit: 76173da215dc9878b9fa49cfe9a77541e2adf4be
Parents: 05a1d20
Author: Ian Zhou <ia...@google.com>
Authored: Fri Jun 17 14:21:05 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jun 27 12:36:22 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/google-cloud-platform/pom.xml      |  47 +++++
 .../io/gcp/bigtable/BigtableTestOptions.java    |  42 ++++
 .../sdk/io/gcp/bigtable/BigtableReadIT.java     |  61 ++++++
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    | 197 +++++++++++++++++++
 4 files changed, 347 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 5786e84..bb5fd11 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -53,7 +53,39 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
+
+      <!-- Integration Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <useManifestOnlyJar>false</useManifestOnlyJar>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
+
+    <extensions>
+      <!-- Use os-maven-plugin to initialize the "os.detected" properties -->
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.4.0.Final</version>
+      </extension>
+    </extensions>
   </build>
 
   <dependencies>
@@ -99,6 +131,14 @@
       <artifactId>jsr305</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-tcnative-boringssl-static</artifactId>
+      <version>1.1.33.Fork13</version>
+      <classifier>${os.detected.classifier}</classifier>
+      <scope>runtime</scope>
+    </dependency>
+
     <!--  test -->
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -115,6 +155,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
new file mode 100644
index 0000000..0cd4f57
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * Properties needed when using Bigtable with the Beam SDK.
+ */
+public interface BigtableTestOptions extends TestPipelineOptions {
+  @Description("Project ID for Bigtable")
+  @Default.String("apache-beam-testing")
+  String getProjectId();
+  void setProjectId(String value);
+
+  @Description("Cluster ID for Bigtable")
+  @Default.String("beam-test")
+  String getClusterId();
+  void setClusterId(String value);
+
+  @Description("Zone ID for Bigtable")
+  @Default.String("us-central1-c")
+  String getZoneId();
+  void setZoneId(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
new file mode 100644
index 0000000..22d5b5b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.bigtable.v1.Row;
+import com.google.cloud.bigtable.config.BigtableOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of BigtableRead.
+ */
+@RunWith(JUnit4.class)
+public class BigtableReadIT {
+
+  @Test
+  public void testE2EBigtableRead() throws Exception {
+    PipelineOptionsFactory.register(BigtableTestOptions.class);
+    BigtableTestOptions options = TestPipeline.testingPipelineOptions()
+        .as(BigtableTestOptions.class);
+
+    BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
+        .setProjectId(options.getProjectId())
+        .setClusterId(options.getClusterId())
+        .setZoneId(options.getZoneId());
+
+    final String tableId = "BigtableReadTest";
+    final long numRows = 1000L;
+
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count = p
+        .apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
+        .apply(Count.<Row>globally());
+    PAssert.thatSingleton(count).isEqualTo(numRows);
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76173da2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
new file mode 100644
index 0000000..af7afc5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.admin.table.v1.ColumnFamily;
+import com.google.bigtable.admin.table.v1.CreateTableRequest;
+import com.google.bigtable.admin.table.v1.DeleteTableRequest;
+import com.google.bigtable.admin.table.v1.GetTableRequest;
+import com.google.bigtable.admin.table.v1.Table;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End-to-end tests of BigtableWrite.
+ */
+@RunWith(JUnit4.class)
+public class BigtableWriteIT implements Serializable {
+  /**
+   * These tests requires a static instances because the writers go through a serialization step
+   * when executing the test and would not affect passed-in objects otherwise.
+   */
+  private static final String COLUMN_FAMILY_NAME = "cf";
+  private static BigtableTestOptions options;
+  private BigtableOptions bigtableOptions;
+  private static BigtableSession session;
+  private static BigtableTableAdminClient tableAdminClient;
+  private final String tableId =
+      String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());
+
+  @Before
+  public void setup() throws Exception {
+    PipelineOptionsFactory.register(BigtableTestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
+
+    BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
+        .setProjectId(options.getProjectId())
+        .setClusterId(options.getClusterId())
+        .setZoneId(options.getZoneId())
+        .setUserAgent("apache-beam-test");
+    bigtableOptions = bigtableOptionsBuilder.build();
+
+    session = new BigtableSession(bigtableOptions);
+    tableAdminClient = session.getTableAdminClient();
+  }
+
+  @Test
+  public void testE2EBigtableWrite() throws Exception {
+    final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+    final String clusterName = bigtableOptions.getClusterName().toString();
+    final int numRows = 1000;
+    final List<KV<ByteString, ByteString>> testData = generateTableData(numRows);
+
+    createEmptyTable(clusterName, tableId);
+
+    Pipeline p = Pipeline.create(options);
+    p.apply(CountingInput.upTo(numRows))
+        .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            int index = c.element().intValue();
+
+            Iterable<Mutation> mutations =
+                ImmutableList.of(Mutation.newBuilder()
+                    .setSetCell(
+                        Mutation.SetCell.newBuilder()
+                            .setValue(testData.get(index).getValue())
+                            .setFamilyName(COLUMN_FAMILY_NAME))
+                    .build());
+            c.output(KV.of(testData.get(index).getKey(), mutations));
+          }
+        }))
+        .apply(BigtableIO.write()
+          .withBigtableOptions(bigtableOptions)
+          .withTableId(tableId));
+    p.run();
+
+    // Test number of column families and column family name equality
+    Table table = getTable(tableName);
+    assertThat(table.getColumnFamilies().keySet(), Matchers.hasSize(1));
+    assertThat(table.getColumnFamilies(), Matchers.hasKey(COLUMN_FAMILY_NAME));
+
+    // Test table data equality
+    List<KV<ByteString, ByteString>> tableData = getTableData(tableName);
+    assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray()));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+    deleteTable(tableName);
+    session.close();
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////
+  /** Helper function to generate KV test data. */
+  private List<KV<ByteString, ByteString>> generateTableData(int numRows) {
+    List<KV<ByteString, ByteString>> testData = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
+      ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
+      testData.add(KV.of(key, value));
+    }
+
+    return testData;
+  }
+
+  /** Helper function to create an empty table. */
+  private void createEmptyTable(String clusterName, String tableId) {
+    Table.Builder tableBuilder = Table.newBuilder();
+    Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies();
+    columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
+
+    CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder()
+        .setName(clusterName)
+        .setTableId(tableId)
+        .setTable(tableBuilder.build());
+    tableAdminClient.createTable(createTableRequestBuilder.build());
+  }
+
+  /** Helper function to get a table. */
+  private Table getTable(String tableName) {
+    GetTableRequest.Builder getTableRequestBuilder = GetTableRequest.newBuilder()
+        .setName(tableName);
+    return tableAdminClient.getTable(getTableRequestBuilder.build());
+  }
+
+  /** Helper function to get a table's data. */
+  private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException {
+    List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
+    ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
+        .setTableName(tableName);
+    ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
+
+    Row currentRow;
+    while ((currentRow = scanner.next()) != null) {
+      ByteString key = currentRow.getKey();
+      ByteString value = currentRow.getFamilies(0).getColumns(0).getCells(0).getValue();
+      tableData.add(KV.of(key, value));
+    }
+    scanner.close();
+
+    return tableData;
+  }
+
+  /** Helper function to delete a table. */
+  private void deleteTable(String tableName) {
+    DeleteTableRequest.Builder deleteTableRequestBuilder = DeleteTableRequest.newBuilder()
+        .setName(tableName);
+    tableAdminClient.deleteTable(deleteTableRequestBuilder.build());
+  }
+}