You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/24 21:13:04 UTC
[6/7] beam git commit: Delete SpannerCSVLoader
Delete SpannerCSVLoader
This is not appropriate for examples. SpannerIO should be well-javadoced
and integration tested.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/216b972b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/216b972b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/216b972b
Branch: refs/heads/master
Commit: 216b972b6b09c0bb612d7f716cfec68ace9777ec
Parents: 8871ead
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 10:48:24 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 24 14:12:52 2017 -0700
----------------------------------------------------------------------
.../beam/examples/spanner/SpannerCSVLoader.java | 143 -------------------
1 file changed, 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/216b972b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
deleted file mode 100644
index eee581d..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.spanner;
-
-import com.google.cloud.spanner.Database;
-import com.google.cloud.spanner.DatabaseAdminClient;
-import com.google.cloud.spanner.Mutation;
-import com.google.cloud.spanner.Operation;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerException;
-import com.google.cloud.spanner.SpannerOptions;
-import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
-import java.util.Collections;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-
-
-/**
- * Generalized bulk loader for importing CSV files into Spanner.
- *
- */
-public class SpannerCSVLoader {
-
- /**
- * Command options specification.
- */
- private interface Options extends PipelineOptions {
- @Description("Create a sample database")
- @Default.Boolean(false)
- boolean isCreateDatabase();
- void setCreateDatabase(boolean createDatabase);
-
- @Description("File to read from ")
- @Validation.Required
- String getInput();
- void setInput(String value);
-
- @Description("Instance ID to write to in Spanner")
- @Validation.Required
- String getInstanceId();
- void setInstanceId(String value);
-
- @Description("Database ID to write to in Spanner")
- @Validation.Required
- String getDatabaseId();
- void setDatabaseId(String value);
-
- @Description("Table name")
- @Validation.Required
- String getTable();
- void setTable(String value);
- }
-
-
- /**
- * Constructs and executes the processing pipeline based upon command options.
- */
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- Pipeline p = Pipeline.create(options);
- PCollection<String> lines = p.apply(TextIO.Read.from(options.getInput()));
- PCollection<Mutation> mutations = lines
- .apply(ParDo.of(new NaiveParseCsvFn(options.getTable())));
- mutations
- .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
- p.run().waitUntilFinish();
- }
-
- public static void createDatabase(Options options) {
- Spanner client = SpannerOptions.getDefaultInstance().getService();
-
- DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient();
- try {
- databaseAdminClient.dropDatabase(options.getInstanceId(), options
- .getDatabaseId());
- } catch (SpannerException e) {
- // Does not exist, ignore.
- }
- Operation<Database, CreateDatabaseMetadata> op = databaseAdminClient.createDatabase(
- options.getInstanceId(), options
- .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " ("
- + " Key INT64,"
- + " Name STRING,"
- + " Email STRING,"
- + " Age INT,"
- + ") PRIMARY KEY (Key)"));
- op.waitFor();
- }
-
-
- /**
- * A DoFn that creates a Spanner Mutation for each CSV line.
- */
- static class NaiveParseCsvFn extends DoFn<String, Mutation> {
- private final String table;
-
- NaiveParseCsvFn(String table) {
- this.table = table;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String line = c.element();
- String[] elements = line.split(",");
- if (elements.length != 4) {
- return;
- }
- Mutation mutation = Mutation.newInsertOrUpdateBuilder(table)
- .set("Key").to(Long.valueOf(elements[0]))
- .set("Name").to(elements[1])
- .set("Email").to(elements[2])
- .set("Age").to(Integer.valueOf(elements[3]))
- .build();
- c.output(mutation);
- }
- }
-}