You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/02 18:07:10 UTC

[GitHub] [beam] lgajowy commented on a change in pull request #12117: [BEAM-10343] Add dispositions for SnowflakeIO.write

lgajowy commented on a change in pull request #12117:
URL: https://github.com/apache/beam/pull/12117#discussion_r449152583



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -155,7 +157,7 @@
  * items.apply(
  *     SnowflakeIO.<KV<Integer, String>>write()
  *         .withDataSourceConfiguration(dataSourceConfiguration)
- *         .withTable(table)
+ *         .toTable(table)

Review comment:
       nit: could you move this `toTable()` call below all `with...` calls? 

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -593,7 +606,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
      *
      * @param table - String with the name of the table.
      */
-    public Write<T> withTable(String table) {
+    public Write<T> toTable(String table) {

Review comment:
       Rename `toTable()` method to `to()`? Seems more consistent with other ios, eg. BigQueryIO and you already specify `String table` argument and the docs tells what's the subject of this operation so it's all clear. :)

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServiceImpl.java
##########
@@ -169,6 +191,54 @@ private void prepareTableAccordingWriteDisposition(
     }
   }
 
+  private void createTableIfNotExists(
+      DataSource dataSource, String table, SnowflakeTableSchema tableSchema) throws SQLException {
+    String query =
+        String.format(
+            "SELECT EXISTS (SELECT 1 FROM  information_schema.tables  WHERE  table_name = '%s');",

Review comment:
       Are the double spaces needed here?
   
   ```suggestion
               "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s');",
   ```

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/text/SnowflakeVarchar.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.data.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.io.snowflake.data.SnowflakeDataType;
+
+public class SnowflakeVarchar implements SnowflakeDataType, Serializable {
+  private static final Long MAX_LENGTH = 16777216L;
+  private Long length;
+
+  public static SnowflakeVarchar of() {
+    return new SnowflakeVarchar();
+  }
+
+  public static SnowflakeVarchar of(long length) {
+    return new SnowflakeVarchar(length);
+  }
+
+  public SnowflakeVarchar() {}
+
+  public SnowflakeVarchar(long length) {
+    if (length > MAX_LENGTH) {
+      throw new IllegalArgumentException();

Review comment:
       (Same as above)

##########
File path: sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeBooleanTest.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.test.unit.data;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.snowflake.data.logical.SnowflakeBoolean;
+import org.junit.Test;
+
+public class SnowflakeBooleanTest {

Review comment:
       I wonder if there's a way of testing all the Snowflake data types in one test/one testing `SnowflakeDataTypeTest` class  (parametrized test?). :thinking: Worth trying imho but I didn't check this. 

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/text/SnowflakeBinary.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.data.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.io.snowflake.data.SnowflakeDataType;
+
+public class SnowflakeBinary implements SnowflakeDataType, Serializable {
+
+  private static final Long MAX_SIZE = 8388608L;
+
+  private Long size; // bytes
+
+  public SnowflakeBinary() {}
+
+  public static SnowflakeBinary of() {
+    return new SnowflakeBinary();
+  }
+
+  public static SnowflakeBinary of(long size) {
+    return new SnowflakeBinary(size);
+  }
+
+  public SnowflakeBinary(long size) {
+    if (size > MAX_SIZE) {
+      throw new IllegalArgumentException();

Review comment:
       Does it make sense to add a message for the exception? Something like "Snowflake binary maximum size exceeded.".

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServiceImpl.java
##########
@@ -25,7 +27,9 @@
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.sql.DataSource;
+import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
 import org.apache.beam.sdk.io.snowflake.enums.CloudProvider;

Review comment:
       `CloudProvider` is used only in SnowflakeServiceImpl. I think it makes sense to keep it inside the serviceImpl as a private enum, wdyt?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org