You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/27 21:46:28 UTC
[1/2] beam git commit: Reading spanner schema transform
Repository: beam
Updated Branches:
refs/heads/master a6f69bd12 -> e686286f1
Reading spanner schema transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fd027b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fd027b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fd027b2
Branch: refs/heads/master
Commit: 1fd027b206977fbe5ff6011e3b006836087f5d08
Parents: a6f69bd
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Oct 18 15:18:55 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 27 14:44:30 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/ReadSpannerSchema.java | 94 ++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerSchema.java | 144 +++++++++++++++++++
.../io/gcp/spanner/ReadSpannerSchemaTest.java | 134 +++++++++++++++++
.../sdk/io/gcp/spanner/SpannerSchemaTest.java | 61 ++++++++
4 files changed, 433 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
new file mode 100644
index 0000000..e2ade68
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * This {@link DoFn} reads Cloud Spanner 'information_schema.*' tables to build the
+ * {@link SpannerSchema}.
+ */
+class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
+
+ private final SpannerConfig config;
+
+ private transient SpannerAccessor spannerAccessor;
+
+ public ReadSpannerSchema(SpannerConfig config) {
+ this.config = config;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ spannerAccessor = config.connectToSpanner();
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ spannerAccessor.close();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ SpannerSchema.Builder builder = SpannerSchema.builder();
+ DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
+ try (ReadOnlyTransaction tx =
+ databaseClient.readOnlyTransaction()) {
+ ResultSet resultSet = readTableInfo(tx);
+
+ while (resultSet.next()) {
+ String tableName = resultSet.getString(0);
+ String columnName = resultSet.getString(1);
+ String type = resultSet.getString(2);
+
+ builder.addColumn(tableName, columnName, type);
+ }
+
+ resultSet = readPrimaryKeyInfo(tx);
+ while (resultSet.next()) {
+ String tableName = resultSet.getString(0);
+ String columnName = resultSet.getString(1);
+ String ordering = resultSet.getString(2);
+
+ builder.addKeyPart(tableName, columnName, ordering.toUpperCase().equals("DESC"));
+ }
+ }
+ c.output(builder.build());
+ }
+
+ private ResultSet readTableInfo(ReadOnlyTransaction tx) {
+ return tx.executeQuery(Statement.of(
+ "SELECT c.table_name, c.column_name, c.spanner_type"
+ + " FROM information_schema.columns as c"
+ + " WHERE where c.table_catalog = '' AND c.table_schema = ''"
+ + " ORDER BY c.table_name, c.ordinal_position"));
+ }
+
+ private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) {
+ return tx.executeQuery(Statement
+ .of("SELECT t.table_name, t.column_name, t.column_ordering"
+ + " FROM information_schema.index_columns AS t "
+ + " WHERE t.index_name = 'PRIMARY_KEY' AND t.table_catalog = ''"
+ + " AND t.table_schema = ''"
+ + " ORDER BY t.table_name, t.ordinal_position"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
new file mode 100644
index 0000000..4c12b8d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.spanner.Type;
+import com.google.common.collect.ArrayListMultimap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates Cloud Spanner Schema.
+ */
+class SpannerSchema implements Serializable {
+ private final List<String> tables;
+ private final ArrayListMultimap<String, Column> columns;
+ private final ArrayListMultimap<String, KeyPart> keyParts;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link SpannerSchema}.
+ */
+ static class Builder {
+ private final ArrayListMultimap<String, Column> columns = ArrayListMultimap.create();
+ private final ArrayListMultimap<String, KeyPart> keyParts = ArrayListMultimap.create();
+
+ public Builder addColumn(String table, String name, String type) {
+ addColumn(table, Column.create(name.toLowerCase(), type));
+ return this;
+ }
+
+ private Builder addColumn(String table, Column column) {
+ columns.put(table.toLowerCase(), column);
+ return this;
+ }
+
+ public Builder addKeyPart(String table, String column, boolean desc) {
+ keyParts.put(table, KeyPart.create(column.toLowerCase(), desc));
+ return this;
+ }
+
+ public SpannerSchema build() {
+ return new SpannerSchema(columns, keyParts);
+ }
+ }
+
+ private SpannerSchema(ArrayListMultimap<String, Column> columns,
+ ArrayListMultimap<String, KeyPart> keyParts) {
+ this.columns = columns;
+ this.keyParts = keyParts;
+ tables = new ArrayList<>(columns.keySet());
+ }
+
+ public List<String> getTables() {
+ return tables;
+ }
+
+ public List<Column> getColumns(String table) {
+ return columns.get(table);
+ }
+
+ public List<KeyPart> getKeyParts(String table) {
+ return keyParts.get(table);
+ }
+
+ @AutoValue
+ abstract static class KeyPart implements Serializable {
+ static KeyPart create(String field, boolean desc) {
+ return new AutoValue_SpannerSchema_KeyPart(field, desc);
+ }
+
+ abstract String getField();
+
+ abstract boolean isDesc();
+ }
+
+ @AutoValue
+ abstract static class Column implements Serializable {
+
+ static Column create(String name, Type type) {
+ return new AutoValue_SpannerSchema_Column(name, type);
+ }
+
+ static Column create(String name, String spannerType) {
+ return create(name, parseSpannerType(spannerType));
+ }
+
+ public abstract String getName();
+
+ public abstract Type getType();
+
+ private static Type parseSpannerType(String spannerType) {
+ spannerType = spannerType.toUpperCase();
+ if (spannerType.equals("BOOL")) {
+ return Type.bool();
+ }
+ if (spannerType.equals("INT64")) {
+ return Type.int64();
+ }
+ if (spannerType.equals("FLOAT64")) {
+ return Type.float64();
+ }
+ if (spannerType.startsWith("STRING")) {
+ return Type.string();
+ }
+ if (spannerType.startsWith("BYTES")) {
+ return Type.bytes();
+ }
+ if (spannerType.equals("TIMESTAMP")) {
+ return Type.timestamp();
+ }
+ if (spannerType.equals("DATE")) {
+ return Type.date();
+ }
+
+ if (spannerType.startsWith("ARRAY")) {
+ // Substring "ARRAY<xxx>"
+ String spannerArrayType = spannerType.substring(6, spannerType.length() - 1);
+ Type itemType = parseSpannerType(spannerArrayType);
+ return Type.array(itemType);
+ }
+ throw new IllegalArgumentException("Unknown spanner type " + spannerType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
new file mode 100644
index 0000000..25dc6dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.ResultSets;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * A test of {@link ReadSpannerSchemaTest}.
+ */
+public class ReadSpannerSchemaTest {
+
+ @Rule
+ public final transient ExpectedException thrown = ExpectedException.none();
+
+ private FakeServiceFactory serviceFactory;
+ private ReadOnlyTransaction mockTx;
+
+
+ private static Struct columnMetadata(String tableName, String columnName, String type) {
+ return Struct.newBuilder().add("table_name", Value.string(tableName))
+ .add("column_name", Value.string(columnName)).add("spanner_type", Value.string(type))
+ .build();
+ }
+
+ private static Struct pkMetadata(String tableName, String columnName, String ordering) {
+ return Struct.newBuilder().add("table_name", Value.string(tableName))
+ .add("column_name", Value.string(columnName)).add("column_ordering", Value.string(ordering))
+ .build();
+ }
+
+ private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+ Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
+ Type.StructField.of("column_name", Type.string()),
+ Type.StructField.of("spanner_type", Type.string()));
+ when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
+
+ @Override public boolean matches(Object argument) {
+ if (!(argument instanceof Statement)) {
+ return false;
+ }
+ Statement st = (Statement) argument;
+ return st.getSql().contains("information_schema.columns");
+ }
+ }))).thenReturn(ResultSets.forRows(type, rows));
+ }
+
+ private void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+ Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
+ Type.StructField.of("column_name", Type.string()),
+ Type.StructField.of("column_ordering", Type.string()));
+ when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
+
+ @Override public boolean matches(Object argument) {
+ if (!(argument instanceof Statement)) {
+ return false;
+ }
+ Statement st = (Statement) argument;
+ return st.getSql().contains("information_schema.index_columns");
+ }
+ }))).thenReturn(ResultSets.forRows(type, rows));
+ }
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ serviceFactory = new FakeServiceFactory();
+ mockTx = mock(ReadOnlyTransaction.class);
+ }
+
+ @Test
+ public void simple() throws Exception {
+ // Simplest schema: a table with int64 key
+ ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class);
+ when(serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn(tx);
+
+ preparePkMetadata(tx, Arrays.asList(pkMetadata("test", "key", "ASC")));
+ prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", "INT64")));
+
+ SpannerConfig config = SpannerConfig.create().withProjectId("test-project")
+ .withInstanceId("test-instance").withDatabaseId("test-database")
+ .withServiceFactory(serviceFactory);
+
+ DoFnTester<Void, SpannerSchema> tester = DoFnTester.of(new ReadSpannerSchema(config));
+ List<SpannerSchema> schemas = tester.processBundle(Arrays.asList((Void) null));
+
+ assertEquals(1, schemas.size());
+
+ SpannerSchema schema = schemas.get(0);
+
+ assertEquals(1, schema.getTables().size());
+
+ SpannerSchema.Column column = SpannerSchema.Column.create("key", Type.int64());
+ SpannerSchema.KeyPart keyPart = SpannerSchema.KeyPart.create("key", false);
+
+ assertThat(schema.getColumns("test"), contains(column));
+ assertThat(schema.getKeyParts("test"), contains(keyPart));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
new file mode 100644
index 0000000..fcb23dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * A test of {@link SpannerSchema}.
+ */
+public class SpannerSchemaTest {
+
+ @Test
+ public void testSingleTable() throws Exception {
+ SpannerSchema schema = SpannerSchema.builder()
+ .addColumn("test", "pk", "STRING(48)")
+ .addKeyPart("test", "pk", false)
+ .addColumn("test", "maxKey", "STRING(MAX)").build();
+
+ assertEquals(1, schema.getTables().size());
+ assertEquals(2, schema.getColumns("test").size());
+ assertEquals(1, schema.getKeyParts("test").size());
+ }
+
+ @Test
+ public void testTwoTables() throws Exception {
+ SpannerSchema schema = SpannerSchema.builder()
+ .addColumn("test", "pk", "STRING(48)")
+ .addKeyPart("test", "pk", false)
+ .addColumn("test", "maxKey", "STRING(MAX)")
+
+ .addColumn("other", "pk", "INT64")
+ .addKeyPart("other", "pk", true)
+ .addColumn("other", "maxKey", "STRING(MAX)")
+
+ .build();
+
+ assertEquals(2, schema.getTables().size());
+ assertEquals(2, schema.getColumns("test").size());
+ assertEquals(1, schema.getKeyParts("test").size());
+
+ assertEquals(2, schema.getColumns("other").size());
+ assertEquals(1, schema.getKeyParts("other").size());
+ }
+}
[2/2] beam git commit: This closes #4013: [BEAM-1542] A transform for
reading the Spanner schema
Posted by jk...@apache.org.
This closes #4013: [BEAM-1542] A transform for reading the Spanner schema
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e686286f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e686286f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e686286f
Branch: refs/heads/master
Commit: e686286f10036b476ecaaabd935b8ec37dfb5616
Parents: a6f69bd 1fd027b
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Fri Oct 27 14:46:04 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 27 14:46:04 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/ReadSpannerSchema.java | 94 ++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerSchema.java | 144 +++++++++++++++++++
.../io/gcp/spanner/ReadSpannerSchemaTest.java | 134 +++++++++++++++++
.../sdk/io/gcp/spanner/SpannerSchemaTest.java | 61 ++++++++
4 files changed, 433 insertions(+)
----------------------------------------------------------------------