You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/27 21:46:28 UTC

[1/2] beam git commit: Reading spanner schema transform

Repository: beam
Updated Branches:
  refs/heads/master a6f69bd12 -> e686286f1


Reading spanner schema transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fd027b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fd027b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fd027b2

Branch: refs/heads/master
Commit: 1fd027b206977fbe5ff6011e3b006836087f5d08
Parents: a6f69bd
Author: Mairbek Khadikov <ma...@google.com>
Authored: Wed Oct 18 15:18:55 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 27 14:44:30 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/ReadSpannerSchema.java   |  94 ++++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerSchema.java  | 144 +++++++++++++++++++
 .../io/gcp/spanner/ReadSpannerSchemaTest.java   | 134 +++++++++++++++++
 .../sdk/io/gcp/spanner/SpannerSchemaTest.java   |  61 ++++++++
 4 files changed, 433 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
new file mode 100644
index 0000000..e2ade68
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * This {@link DoFn} reads Cloud Spanner 'information_schema.*' tables to build the
+ * {@link SpannerSchema}.
+ */
+class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
+
+  private final SpannerConfig config;
+
+  private transient SpannerAccessor spannerAccessor;
+
+  public ReadSpannerSchema(SpannerConfig config) {
+    this.config = config;
+  }
+
+  @Setup
+  public void setup() throws Exception {
+    spannerAccessor = config.connectToSpanner();
+  }
+
+  @Teardown
+  public void teardown() throws Exception {
+    spannerAccessor.close();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    SpannerSchema.Builder builder = SpannerSchema.builder();
+    DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
+    try (ReadOnlyTransaction tx =
+        databaseClient.readOnlyTransaction()) {
+      ResultSet resultSet = readTableInfo(tx);
+
+      while (resultSet.next()) {
+        String tableName = resultSet.getString(0);
+        String columnName = resultSet.getString(1);
+        String type = resultSet.getString(2);
+
+        builder.addColumn(tableName, columnName, type);
+      }
+
+      resultSet = readPrimaryKeyInfo(tx);
+      while (resultSet.next()) {
+        String tableName = resultSet.getString(0);
+        String columnName = resultSet.getString(1);
+        String ordering = resultSet.getString(2);
+
+        builder.addKeyPart(tableName, columnName, ordering.toUpperCase().equals("DESC"));
+      }
+    }
+    c.output(builder.build());
+  }
+
+  private ResultSet readTableInfo(ReadOnlyTransaction tx) {
+    return tx.executeQuery(Statement.of(
+        "SELECT c.table_name, c.column_name, c.spanner_type"
+            + " FROM information_schema.columns as c"
+            + " WHERE where c.table_catalog = '' AND c.table_schema = ''"
+            + " ORDER BY c.table_name, c.ordinal_position"));
+  }
+
+  private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) {
+    return tx.executeQuery(Statement
+        .of("SELECT t.table_name, t.column_name, t.column_ordering"
+            + " FROM information_schema.index_columns AS t "
+            + " WHERE t.index_name = 'PRIMARY_KEY' AND t.table_catalog = ''"
+            + " AND t.table_schema = ''"
+            + " ORDER BY t.table_name, t.ordinal_position"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
new file mode 100644
index 0000000..4c12b8d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.spanner.Type;
+import com.google.common.collect.ArrayListMultimap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates Cloud Spanner Schema.
+ */
+class SpannerSchema implements Serializable {
+  private final List<String> tables;
+  private final ArrayListMultimap<String, Column> columns;
+  private final ArrayListMultimap<String, KeyPart> keyParts;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for {@link SpannerSchema}.
+   */
+  static class Builder {
+    private final ArrayListMultimap<String, Column> columns = ArrayListMultimap.create();
+    private final ArrayListMultimap<String, KeyPart> keyParts = ArrayListMultimap.create();
+
+    public Builder addColumn(String table, String name, String type) {
+      addColumn(table, Column.create(name.toLowerCase(), type));
+      return this;
+    }
+
+    private Builder addColumn(String table, Column column) {
+      columns.put(table.toLowerCase(), column);
+      return this;
+    }
+
+    public Builder addKeyPart(String table, String column, boolean desc) {
+      keyParts.put(table, KeyPart.create(column.toLowerCase(), desc));
+      return this;
+    }
+
+    public SpannerSchema build() {
+      return new SpannerSchema(columns, keyParts);
+    }
+  }
+
+  private SpannerSchema(ArrayListMultimap<String, Column> columns,
+      ArrayListMultimap<String, KeyPart> keyParts) {
+    this.columns = columns;
+    this.keyParts = keyParts;
+    tables = new ArrayList<>(columns.keySet());
+  }
+
+  public List<String> getTables() {
+    return tables;
+  }
+
+  public List<Column> getColumns(String table) {
+    return columns.get(table);
+  }
+
+  public List<KeyPart> getKeyParts(String table) {
+    return keyParts.get(table);
+  }
+
+  @AutoValue
+  abstract static class KeyPart implements Serializable {
+    static KeyPart create(String field, boolean desc) {
+      return new AutoValue_SpannerSchema_KeyPart(field, desc);
+    }
+
+    abstract String getField();
+
+    abstract boolean isDesc();
+  }
+
+  @AutoValue
+  abstract static class Column implements Serializable {
+
+    static Column create(String name, Type type) {
+      return new AutoValue_SpannerSchema_Column(name, type);
+    }
+
+    static Column create(String name, String spannerType) {
+      return create(name, parseSpannerType(spannerType));
+    }
+
+    public abstract String getName();
+
+    public abstract Type getType();
+
+    private static Type parseSpannerType(String spannerType) {
+      spannerType = spannerType.toUpperCase();
+      if (spannerType.equals("BOOL")) {
+        return Type.bool();
+      }
+      if (spannerType.equals("INT64")) {
+        return Type.int64();
+      }
+      if (spannerType.equals("FLOAT64")) {
+        return Type.float64();
+      }
+      if (spannerType.startsWith("STRING")) {
+        return Type.string();
+      }
+      if (spannerType.startsWith("BYTES")) {
+        return Type.bytes();
+      }
+      if (spannerType.equals("TIMESTAMP")) {
+        return Type.timestamp();
+      }
+      if (spannerType.equals("DATE")) {
+        return Type.date();
+      }
+
+      if (spannerType.startsWith("ARRAY")) {
+        // Substring "ARRAY<xxx>"
+        String spannerArrayType = spannerType.substring(6, spannerType.length() - 1);
+        Type itemType = parseSpannerType(spannerArrayType);
+        return Type.array(itemType);
+      }
+      throw new IllegalArgumentException("Unknown spanner type " + spannerType);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
new file mode 100644
index 0000000..25dc6dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.spanner.ReadOnlyTransaction;
+import com.google.cloud.spanner.ResultSets;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * A test of {@link ReadSpannerSchemaTest}.
+ */
+public class ReadSpannerSchemaTest {
+
+  @Rule
+  public final transient ExpectedException thrown = ExpectedException.none();
+
+  private FakeServiceFactory serviceFactory;
+  private ReadOnlyTransaction mockTx;
+
+
+  private static Struct columnMetadata(String tableName, String columnName, String type) {
+    return Struct.newBuilder().add("table_name", Value.string(tableName))
+        .add("column_name", Value.string(columnName)).add("spanner_type", Value.string(type))
+        .build();
+  }
+
+  private static Struct pkMetadata(String tableName, String columnName, String ordering) {
+    return Struct.newBuilder().add("table_name", Value.string(tableName))
+        .add("column_name", Value.string(columnName)).add("column_ordering", Value.string(ordering))
+        .build();
+  }
+
+  private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+    Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
+        Type.StructField.of("column_name", Type.string()),
+        Type.StructField.of("spanner_type", Type.string()));
+    when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
+
+      @Override public boolean matches(Object argument) {
+        if (!(argument instanceof Statement)) {
+          return false;
+        }
+        Statement st = (Statement) argument;
+        return st.getSql().contains("information_schema.columns");
+      }
+    }))).thenReturn(ResultSets.forRows(type, rows));
+  }
+
+  private void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
+    Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
+        Type.StructField.of("column_name", Type.string()),
+        Type.StructField.of("column_ordering", Type.string()));
+    when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
+
+      @Override public boolean matches(Object argument) {
+        if (!(argument instanceof Statement)) {
+          return false;
+        }
+        Statement st = (Statement) argument;
+        return st.getSql().contains("information_schema.index_columns");
+      }
+    }))).thenReturn(ResultSets.forRows(type, rows));
+  }
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    serviceFactory = new FakeServiceFactory();
+    mockTx = mock(ReadOnlyTransaction.class);
+  }
+
+  @Test
+  public void simple() throws Exception {
+    // Simplest schema: a table with int64 key
+    ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class);
+    when(serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn(tx);
+
+    preparePkMetadata(tx, Arrays.asList(pkMetadata("test", "key", "ASC")));
+    prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", "INT64")));
+
+    SpannerConfig config = SpannerConfig.create().withProjectId("test-project")
+        .withInstanceId("test-instance").withDatabaseId("test-database")
+        .withServiceFactory(serviceFactory);
+
+    DoFnTester<Void, SpannerSchema> tester = DoFnTester.of(new ReadSpannerSchema(config));
+    List<SpannerSchema> schemas = tester.processBundle(Arrays.asList((Void) null));
+
+    assertEquals(1, schemas.size());
+
+    SpannerSchema schema = schemas.get(0);
+
+    assertEquals(1, schema.getTables().size());
+
+    SpannerSchema.Column column = SpannerSchema.Column.create("key", Type.int64());
+    SpannerSchema.KeyPart keyPart = SpannerSchema.KeyPart.create("key", false);
+
+    assertThat(schema.getColumns("test"), contains(column));
+    assertThat(schema.getKeyParts("test"), contains(keyPart));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1fd027b2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
new file mode 100644
index 0000000..fcb23dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * A test of {@link SpannerSchema}.
+ */
+public class SpannerSchemaTest {
+
+  @Test
+  public void testSingleTable() throws Exception {
+    SpannerSchema schema = SpannerSchema.builder()
+        .addColumn("test", "pk", "STRING(48)")
+        .addKeyPart("test", "pk", false)
+        .addColumn("test", "maxKey", "STRING(MAX)").build();
+
+    assertEquals(1, schema.getTables().size());
+    assertEquals(2, schema.getColumns("test").size());
+    assertEquals(1, schema.getKeyParts("test").size());
+  }
+
+  @Test
+  public void testTwoTables() throws Exception {
+    SpannerSchema schema = SpannerSchema.builder()
+        .addColumn("test", "pk", "STRING(48)")
+        .addKeyPart("test", "pk", false)
+        .addColumn("test", "maxKey", "STRING(MAX)")
+
+        .addColumn("other", "pk", "INT64")
+        .addKeyPart("other", "pk", true)
+        .addColumn("other", "maxKey", "STRING(MAX)")
+
+        .build();
+
+    assertEquals(2, schema.getTables().size());
+    assertEquals(2, schema.getColumns("test").size());
+    assertEquals(1, schema.getKeyParts("test").size());
+
+    assertEquals(2, schema.getColumns("other").size());
+    assertEquals(1, schema.getKeyParts("other").size());
+  }
+}


[2/2] beam git commit: This closes #4013: [BEAM-1542] A transform for reading the Spanner schema

Posted by jk...@apache.org.
This closes #4013: [BEAM-1542] A transform for reading the Spanner schema


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e686286f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e686286f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e686286f

Branch: refs/heads/master
Commit: e686286f10036b476ecaaabd935b8ec37dfb5616
Parents: a6f69bd 1fd027b
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Fri Oct 27 14:46:04 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 27 14:46:04 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/ReadSpannerSchema.java   |  94 ++++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerSchema.java  | 144 +++++++++++++++++++
 .../io/gcp/spanner/ReadSpannerSchemaTest.java   | 134 +++++++++++++++++
 .../sdk/io/gcp/spanner/SpannerSchemaTest.java   |  61 ++++++++
 4 files changed, 433 insertions(+)
----------------------------------------------------------------------