You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/09/01 17:21:00 UTC

[jira] [Work logged] (BEAM-10669) Add support for Dataflow Templates

     [ https://issues.apache.org/jira/browse/BEAM-10669?focusedWorklogId=477350&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-477350 ]

ASF GitHub Bot logged work on BEAM-10669:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Sep/20 17:20
            Start Date: 01/Sep/20 17:20
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #12618:
URL: https://github.com/apache/beam/pull/12618#discussion_r481301572



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -1623,4 +1867,12 @@ public DataSourceConfiguration getConfig() {
       return this.config;
     }
   }
+
+  private static String getValueOrNull(ValueProvider<String> valueProvider) {
+    return valueProvider != null ? valueProvider.get() : null;
+  }
+
+  private static boolean isNotEmpty(ValueProvider<String> valueProvider) {
+    return valueProvider != null && valueProvider.get() != null && !valueProvider.get().isEmpty();

Review comment:
       I see you're using this method at pipeline construction time. Note that for RuntimeValueProviders, they will throw an exception when you ccall `get` on them at construction time.
   
   [The documentation is poor, but it'll throw `IllegalStateException`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java#L246-L270) - so you also need to call valueProvider.isAccessible() before calling get on it. - and if it's not yet accessible, then that does not mean it's empty or not. It just means that we'll know later whether it's empty.
   
   VP javadoc: https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/options/ValueProvider.html
   
   I hope I did not confuse you more here : )

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -390,6 +407,12 @@
      * @return
      */
     public Read<T> withQuotationMark(String quotationMark) {
+      return toBuilder()
+          .setStorageIntegrationName(ValueProvider.StaticValueProvider.of(quotationMark))

Review comment:
       This should be setQuotationMark?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -1532,26 +1767,47 @@ public DataSource buildDatasource() {
         SnowflakeBasicDataSource basicDataSource = new SnowflakeBasicDataSource();
         basicDataSource.setUrl(buildUrl());
 
-        if (getUsername() != null) {
-          basicDataSource.setUser(getUsername());
+        if (isNotEmpty(getOauthToken())) {
+          basicDataSource.setOauthToken(getOauthToken().get());
+        } else if (isNotEmpty(getUsername()) && getPrivateKey() != null) {
+          basicDataSource.setUser(getUsername().get());
+          basicDataSource.setPrivateKey(getPrivateKey());
+        } else if (isNotEmpty(getUsername())
+            && isNotEmpty(getPrivateKeyPassphrase())
+            && isNotEmpty(getRawPrivateKey())) {
+          PrivateKey privateKey =
+              KeyPairUtils.preparePrivateKey(
+                  getRawPrivateKey().get(), getPrivateKeyPassphrase().get());
+          basicDataSource.setPrivateKey(privateKey);
+          basicDataSource.setUser(getUsername().get());
+

Review comment:
       These data are sensitive, so maybe we should not show them so readily. They are part of the create job request, and also visible in the UI (by anyone who has access to the job, including support engineers). (to be fair, PipelineOptions are also visible in the UI) - are you sure it makes sense to make them display data?
   
   I don't have strong opinions either way, but I think it makes sense to reflect what's the best approach.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -1302,123 +1381,249 @@ public void finishBundle() throws Exception {
     }
   }
 
-  private static String getValueOrNull(ValueProvider<String> valueProvider) {
-    return valueProvider != null ? valueProvider.get() : null;
-  }
-
   /**
    * A POJO describing a {@link DataSource}, providing all properties allowing to create a {@link
    * DataSource}.
    */
   @AutoValue
   public abstract static class DataSourceConfiguration implements Serializable {
+    @Nullable
+    public abstract String getUrl();
 
-    public abstract @Nullable String getUrl();
+    @Nullable
+    public abstract ValueProvider<String> getUsername();
 
-    public abstract @Nullable String getUsername();
+    @Nullable
+    public abstract ValueProvider<String> getPassword();
 
-    public abstract @Nullable String getPassword();
+    @Nullable
+    public abstract PrivateKey getPrivateKey();
 
-    public abstract @Nullable PrivateKey getPrivateKey();
+    @Nullable
+    public abstract String getPrivateKeyPath();
 
-    public abstract @Nullable String getOauthToken();
+    @Nullable
+    public abstract ValueProvider<String> getRawPrivateKey();
 
-    public abstract @Nullable String getDatabase();
+    @Nullable
+    public abstract ValueProvider<String> getPrivateKeyPassphrase();
 
-    public abstract @Nullable String getWarehouse();
+    @Nullable
+    public abstract ValueProvider<String> getOauthToken();
 
-    public abstract @Nullable String getSchema();
+    @Nullable
+    public abstract ValueProvider<String> getDatabase();
 
-    public abstract @Nullable String getServerName();
+    @Nullable
+    public abstract ValueProvider<String> getWarehouse();
 
-    public abstract @Nullable Integer getPortNumber();
+    @Nullable
+    public abstract ValueProvider<String> getSchema();
 
-    public abstract @Nullable String getRole();
+    @Nullable
+    public abstract ValueProvider<String> getServerName();
 
-    public abstract @Nullable Integer getLoginTimeout();
+    @Nullable
+    public abstract Integer getPortNumber();
 
-    public abstract @Nullable Boolean getSsl();
+    @Nullable
+    public abstract ValueProvider<String> getRole();
 
-    public abstract @Nullable Boolean getValidate();
+    @Nullable
+    public abstract String getAuthenticator();
 
-    public abstract @Nullable DataSource getDataSource();
+    @Nullable
+    public abstract Integer getLoginTimeout();
+
+    @Nullable
+    public abstract Boolean getSsl();
+
+    @Nullable
+    public abstract DataSource getDataSource();
 
     abstract Builder builder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setUrl(String url);
 
-      abstract Builder setUsername(String username);
+      abstract Builder setUsername(ValueProvider<String> username);
 
-      abstract Builder setPassword(String password);
+      abstract Builder setPassword(ValueProvider<String> password);
 
       abstract Builder setPrivateKey(PrivateKey privateKey);
 
-      abstract Builder setOauthToken(String oauthToken);
+      abstract Builder setPrivateKeyPath(String privateKeyPath);
+
+      abstract Builder setRawPrivateKey(ValueProvider<String> rawPrivateKey);
+
+      abstract Builder setPrivateKeyPassphrase(ValueProvider<String> privateKeyPassphrase);
 
-      abstract Builder setDatabase(String database);
+      abstract Builder setOauthToken(ValueProvider<String> oauthToken);
 
-      abstract Builder setWarehouse(String warehouse);
+      abstract Builder setDatabase(ValueProvider<String> database);
 
-      abstract Builder setSchema(String schema);
+      abstract Builder setWarehouse(ValueProvider<String> warehouse);
 
-      abstract Builder setServerName(String serverName);
+      abstract Builder setSchema(ValueProvider<String> schema);
+
+      abstract Builder setServerName(ValueProvider<String> serverName);
 
       abstract Builder setPortNumber(Integer portNumber);
 
-      abstract Builder setRole(String role);
+      abstract Builder setRole(ValueProvider<String> role);
+
+      abstract Builder setAuthenticator(String authenticator);
 
       abstract Builder setLoginTimeout(Integer loginTimeout);
 
       abstract Builder setSsl(Boolean ssl);
 
-      abstract Builder setValidate(Boolean validate);
-
       abstract Builder setDataSource(DataSource dataSource);
 
       abstract DataSourceConfiguration build();
     }
 
+    public static DataSourceConfiguration create() {
+      return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder().build();
+    }
+
     /**
      * Creates {@link DataSourceConfiguration} from existing instance of {@link DataSource}.
      *
-     * @param dataSource an instance of {@link DataSource}.
+     * @param dataSource - an instance of {@link DataSource}.
      */
     public static DataSourceConfiguration create(DataSource dataSource) {
       checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
       return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder()
-          .setValidate(true)
           .setDataSource(dataSource)
           .build();
     }
 
     /**
-     * Creates {@link DataSourceConfiguration} from instance of {@link SnowflakeCredentials}.
+     * Sets username/password authentication.
      *
-     * @param credentials an instance of {@link SnowflakeCredentials}.
+     * @param username - Snowflake username.
+     * @param password - Password for provided Snowflake username.
      */
-    public static DataSourceConfiguration create(SnowflakeCredentials credentials) {
-      if (credentials instanceof UsernamePasswordSnowflakeCredentials) {
-        return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder()
-            .setValidate(true)
-            .setUsername(((UsernamePasswordSnowflakeCredentials) credentials).getUsername())
-            .setPassword(((UsernamePasswordSnowflakeCredentials) credentials).getPassword())
-            .build();
-      } else if (credentials instanceof OAuthTokenSnowflakeCredentials) {
-        return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder()
-            .setValidate(true)
-            .setOauthToken(((OAuthTokenSnowflakeCredentials) credentials).getToken())
-            .build();
-      } else if (credentials instanceof KeyPairSnowflakeCredentials) {
-        return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder()
-            .setValidate(true)
-            .setUsername(((KeyPairSnowflakeCredentials) credentials).getUsername())
-            .setPrivateKey(((KeyPairSnowflakeCredentials) credentials).getPrivateKey())
-            .build();
-      }
-      throw new IllegalArgumentException(
-          "Can't create DataSourceConfiguration from given credentials");
+    public DataSourceConfiguration withUsernamePasswordAuth(String username, String password) {
+      return builder()
+          .setUsername(ValueProvider.StaticValueProvider.of(username))
+          .setPassword(ValueProvider.StaticValueProvider.of(password))
+          .build();
+    }
+
+    /**
+     * Sets username/password authentication.
+     *
+     * @param username - Snowflake username.
+     * @param password - Password for provided Snowflake username.
+     */
+    public DataSourceConfiguration withUsernamePasswordAuth(
+        ValueProvider<String> username, ValueProvider<String> password) {
+      return builder().setUsername(username).setPassword(password).build();
+    }
+
+    /**
+     * Sets OAuth authentication.
+     *
+     * @param token - OAuth token.
+     */
+    public DataSourceConfiguration withOAuth(String token) {
+      return builder().setOauthToken(ValueProvider.StaticValueProvider.of(token)).build();
+    }
+
+    /**
+     * Sets OAuth authentication.
+     *
+     * @param token - OAuth token.
+     */
+    public DataSourceConfiguration withOAuth(ValueProvider<String> token) {
+      return builder().setOauthToken(token).build();
+    }
+
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param privateKey - Private key.
+     */
+    public DataSourceConfiguration withKeyPairAuth(String username, PrivateKey privateKey) {
+      return builder()
+          .setUsername(ValueProvider.StaticValueProvider.of(username))
+          .setPrivateKey(privateKey)
+          .build();
+    }
+
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param privateKeyPath - Private key path.
+     * @param privateKeyPassphrase - Passphrase for provided private key.
+     */
+    public DataSourceConfiguration withKeyPairPathAuth(
+        ValueProvider<String> username,
+        String privateKeyPath,
+        ValueProvider<String> privateKeyPassphrase) {
+      String privateKey = KeyPairUtils.readPrivateKeyFile(privateKeyPath);

Review comment:
       In this case, the key comes from a file path in the local machine at pipeline construction time. Is that right?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -1532,26 +1767,47 @@ public DataSource buildDatasource() {
         SnowflakeBasicDataSource basicDataSource = new SnowflakeBasicDataSource();
         basicDataSource.setUrl(buildUrl());
 
-        if (getUsername() != null) {
-          basicDataSource.setUser(getUsername());
+        if (isNotEmpty(getOauthToken())) {
+          basicDataSource.setOauthToken(getOauthToken().get());
+        } else if (isNotEmpty(getUsername()) && getPrivateKey() != null) {
+          basicDataSource.setUser(getUsername().get());
+          basicDataSource.setPrivateKey(getPrivateKey());
+        } else if (isNotEmpty(getUsername())
+            && isNotEmpty(getPrivateKeyPassphrase())
+            && isNotEmpty(getRawPrivateKey())) {
+          PrivateKey privateKey =
+              KeyPairUtils.preparePrivateKey(
+                  getRawPrivateKey().get(), getPrivateKeyPassphrase().get());
+          basicDataSource.setPrivateKey(privateKey);
+          basicDataSource.setUser(getUsername().get());
+
+        } else if (isNotEmpty(getUsername()) && isNotEmpty(getPassword())) {
+          basicDataSource.setUser(getUsername().get());
+          basicDataSource.setPassword(getPassword().get());
+        } else {
+          throw new RuntimeException("Missing credentials values. Please check your credentials");

Review comment:
       populateDisplayData is called at pipeline construction time. Since you're adding support for ValueProviders, you're enabling users to fill in the data for their credentials AFTER construction time. Imagine you're building a template. In that case, none of the credentials will be known.
   
   Therefore, I think you likely don't want to throw this exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 477350)
    Time Spent: 3h 20m  (was: 3h 10m)

> Add support for Dataflow Templates
> ----------------------------------
>
>                 Key: BEAM-10669
>                 URL: https://issues.apache.org/jira/browse/BEAM-10669
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas
>            Reporter: Kasia Kucharczyk
>            Assignee: Kasia Kucharczyk
>            Priority: P2
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> It is required to change arguments to ValueProvider type and move all functionalities of those parameters to extend methods.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)