You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2024/03/29 21:46:43 UTC

[PR] [Java] ManagedIO [beam]

ahmedabu98 opened a new pull request, #30808:
URL: https://github.com/apache/beam/pull/30808

   Managed transforms API for Java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548488911


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   In the interest of getting this PR in on time, I'm +1 on having a static list for now. 
   
   This looks like a bigger design question about how best to maintain a list of supported identifiers that can be accessed from multiple SDKs, and whether we want this list living inside the repo or outside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546828621


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG

Review Comment:
   I added a pattern per transform operation type (e.g. read, write) and used it to filter out identifiers that don't fit the pattern. 
   
   I'm noticing Managed.read() and Managed.write() do the exact same thing, except:
   - they filter out SchemaTransforms according to different patterns
   - small API difference: `read().from(<identifier>)` vs `write().to(<identifier>)`. Could be resolved using `via(<identifier>)` or something to that effect
   
   Having `.read()` and `.write()` makes it more user-friendly but besides that I'm wondering if we have another reason for the semantic difference 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548319363


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   I suppose that's an option (though we'd have to do this per language). Is it bad, however, that a user could pass in a URN that's not managed and expect it to be managed? (I suppose this is a meta-question of who maintains this virtual list and what the actual contract is...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 merged PR #30808:
URL: https://github.com/apache/beam/pull/30808


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546638634


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -58,7 +58,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@AutoService(SchemaTransformProvider.class)
+@AutoService(ManagedSchemaTransformProvider.class)

Review Comment:
   I think IO's can just use SchemaTransformProvider. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554614494


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   And will return a single class `ManagedTransform` instead of `Read` and `Write`.
   
   
   `ManagedTransform managedTransform = Managed.read(ICEBERG).withConfig(configMap)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548051734


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =
+        Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+");
+
+    abstract String getSource();
+
+    abstract Pattern getPattern();

Review Comment:
   As cham mentions below, it's to get identifiers that match to a pattern (e.g. "_read" for `Managed.read()`). So users wouldn't be able to do something like `Managed.write().to(<kafka read>)`. 
   
   > Is Pattern even a valid schema field type
   
   Thanks for catching, yeah this would be problematic for cross-lang. Will make it a string here instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548048553


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   Well, a benefit of explicitly listing them out is to have a defined scope of what is accessible via `Managed.read()` vs `Managed.write()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546621758


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG
+  }
+
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    private final Map<IO, String> identifiers =
+        ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1");
+
+    abstract IO getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(IO source);
+
+      abstract Builder setConfig(String config);
+
+      abstract Builder setConfigUrl(String configUrl);
+
+      abstract Read build();
+    }
+
+    public Read from(IO source) {
+      return toBuilder().setSource(source).build();
+    }
+
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    public Read withConfig(String config) {
+      return toBuilder().setConfigUrl(config).build();
+    }
+
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfigUrl(mapToYamlString(config)).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      String underlyingTransformIdentifier = identifiers.get(getSource());
+
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setIdentifier(underlyingTransformIdentifier)
+              .setType(READ)
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of().from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  public static Write write() {
+    return new AutoValue_Managed_Write.Builder().build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {

Review Comment:
   Ditto (similar changes as `Read`).



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG
+  }
+
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    private final Map<IO, String> identifiers =
+        ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1");
+
+    abstract IO getSource();

Review Comment:
   Is this needed ? Probably we should not introduce a "new" concept `IO` to the API.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG

Review Comment:
   Can we do this using discovery instead of listing specific IO connectors. For example, maintain a Schema-Transform pattern within each TransformGroup (for example, `beam:schematransform:read:.*`) and add transforms discovered via AutoValue that maps the pattern. If we cant develop a pattern for a group we can maintain a list.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG
+  }
+
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    private final Map<IO, String> identifiers =
+        ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1");
+
+    abstract IO getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(IO source);
+
+      abstract Builder setConfig(String config);
+
+      abstract Builder setConfigUrl(String configUrl);
+
+      abstract Read build();
+    }
+
+    public Read from(IO source) {

Review Comment:
   I think this should be `from(String urn)` where `urn` is the schema-transform identifier.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+
+@AutoService(SchemaTransformProvider.class)
+public class ManagedSchemaTransformProvider
+    extends TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:managed:v1";
+  }
+
+  private Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>();
+
+  private ManagedSchemaTransformProvider() {
+    try {
+      for (SchemaTransformProvider schemaTransformProvider :
+          ServiceLoader.load(ManagedSchemaTransformProvider.class)) {

Review Comment:
   We should look for `SchemaTransformProvider`s here not for `ManagedSchemaTransformProvider`s ? (latter is already loaded BTW).



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG
+  }
+
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    private final Map<IO, String> identifiers =
+        ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1");
+
+    abstract IO getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(IO source);
+
+      abstract Builder setConfig(String config);
+
+      abstract Builder setConfigUrl(String configUrl);
+
+      abstract Read build();
+    }
+
+    public Read from(IO source) {
+      return toBuilder().setSource(source).build();
+    }
+
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    public Read withConfig(String config) {
+      return toBuilder().setConfigUrl(config).build();
+    }
+
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfigUrl(mapToYamlString(config)).build();

Review Comment:
   `setConfig()`. URL is a URL to download the config.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";

Review Comment:
   Probably move this to an enum. Say `TransformGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548328971


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   Good point. I wonder if we can use codegen to generate this list of constants from the config (the same config that we use to limit the list of transforms in Read/Write etc.)
   
   That will also allow us to make this consistent across languages (by modifying the tool chain accordingly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1552590182


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following source: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String source);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    public Read withConfigUrl(String configUrl) {

Review Comment:
   Please add Java docs for `withConfig` methods.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following source: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String source);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    public Read withConfig(String config) {
+      return toBuilder().setConfig(config).build();
+    }
+
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  public static Write write(String sink) {

Review Comment:
   Ditto regarding Java docs.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+@AutoService(SchemaTransformProvider.class)
+class ManagedSchemaTransformProvider
+    extends TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:managed:v1";
+  }
+
+  private final Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>();
+
+  private ManagedSchemaTransformProvider(Collection<String> identifiers) {
+    try {
+      for (SchemaTransformProvider schemaTransformProvider :
+          ServiceLoader.load(SchemaTransformProvider.class)) {
+        if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
+          throw new IllegalArgumentException(
+              "Found multiple SchemaTransformProvider implementations with the same identifier "
+                  + schemaTransformProvider.identifier());
+        }
+        schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
+    schemaTransformProviders.entrySet().removeIf(e -> !identifiers.contains(e.getKey()));
+  }
+
+  private static @Nullable ManagedSchemaTransformProvider managedProvider = null;
+
+  public static ManagedSchemaTransformProvider of(Collection<String> supportedIdentifiers) {
+    if (managedProvider == null) {
+      managedProvider = new ManagedSchemaTransformProvider(supportedIdentifiers);
+    }
+    return managedProvider;
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class ManagedConfig {

Review Comment:
   Please make this class and methods package private.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {

Review Comment:
   Please add Java docs and clarify what can be specified as the source by explicitly listing them (and refer to the constants defined above).



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";

Review Comment:
   Add a comment that this will be moved into a dynamically generated list in the future.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following source: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String source);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    public Read withConfig(String config) {
+      return toBuilder().setConfig(config).build();
+    }
+
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String source);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    public Write withConfigUrl(String configUrl) {

Review Comment:
   Ditto.



##########
sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java:
##########
@@ -0,0 +1,2 @@
+package org.apache.beam.sdk.managed;public class ManagedTest {
+}

Review Comment:
   Add tests ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554714833


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)

Review Comment:
   Thanks for the catch



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548031412


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   I think the benefit of listing out the set of SchemaTransforms is to have a mapping from user-friendly name to urn:
   
   ```python
   {
     "ICEBERG": "beam:schematransform:org.apache.beam:iceberg_read:v1", 
     ...
   }
   ```
   
   If instead the API looks like `Managed.read().from(<urn>)` (ie. if the user will explicitly specify the urn instead of `"iceberg"`), we might not need to explicitly list the set because essentially any SchemaTransform is eligible to be "managed"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554605538


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   I think Robert's suggestion was to keep `Managed.read()` and `Managed.write()` as constructors but merge `Read` and `Write` classes (so the API shouldn't change). I'm +1 to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554366451


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);

Review Comment:
   Ack. It's fine since this is not in the public API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554715077


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   Yep, thanks for clarifying that. Just committed those changes. @chamikaramj @robertwb let me know if this is what you had in mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554614494


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   And will return a single class `ManagedTransform` instead of `Read` and `Write`.
   
   
   `ManagedTransform managedTransform = Managed.read(ICEBERG).witnConfig(configMap)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1555022682


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);

Review Comment:
   The builder methods for `ManagedTransform` are package-private, so this shouldn't be visible externally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554043888


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);

Review Comment:
   I suggest removing this method and making setConfigUrl() support local files so that anybody who has the Yaml file locally can use that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554617595


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)

Review Comment:
   This produces a `Transform` object not a `PCollectionRowTuple` till we `apply` it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554339917


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #30808:
URL: https://github.com/apache/beam/pull/30808#issuecomment-2032957731

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1552052753


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   Robert, are you OK with keeping this as a static list for the initial PR and making it dynamic (may be via codegen) in the future ?
   
   I think a dynamic will list only be helpful if we can drop new jars with SchemaTransforms that we would like to manage to an older Beam version. At least initially, we will be very aware about the set of transforms we support (and the list will be small).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548099563


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   I don't think the user will want to manually specify the urn, but rather something more user-friendly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548098968


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {

Review Comment:
   OK, that sounds fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548287322


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   So, we can add constants so that customers do not have to remember URNs (or SchemaTransform identifiers rather). So it will be actually in the form of,
   
   `p.apply(Managed.read().for(Managed.ReadIds.ICEBERG).withConfig(config))`
   
   Also changed `from` to `for` here which sounds more correct.
   
   WDYT ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546987587


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   The idea was to dynamically discover the `SchemaTransform`s that map `read()/write()` etc. by using a pattern that maps to identifiers. The other option will be to explicitly list the set of `SchemaTransform`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546988939


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   `p.apply(Managed.read().from(urn).withConfig(config))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548330330


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   (though, for the first version, Ill be fine with a static list since we'll just have ICEBERG).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546828621


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG

Review Comment:
   I added a pattern per transform operation type (e.g. read, write) and used it to filter out identifiers that don't fit the pattern. 
   
   I'm noticing Managed.read() and Managed.write() do the exact same thing, except:
   - they filter out SchemaTransforms according to different patterns
   - small API difference: `read().from(<identifier>)` vs `write().to(<identifier>)`. Could be resolved using `via(<identifier>)` or something to that effect
   
   Having `.read()` and `.write()` makes it more user-friendly, but besides that I'm wondering if we have reason for the semantic difference 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546828621


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG

Review Comment:
   I added a pattern per transform operation type (e.g. read, write) and used it to filter out identifiers that don't fit the pattern. 
   
   I'm noticing Managed.read() and Managed.write() do the exact same thing, except:
   - they filter out SchemaTransforms according to different patterns
   - small API difference: `read().from(<identifier>)` vs `write().to(<identifier>)`. Could be resolved using `via(<identifier>)` or something to that effect
   
   Wondering if we need this semantic difference at this point 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1552304475


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   `p.apply(Managed.read(ManagedIO.ICEBERG).withConfig(config))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #30808:
URL: https://github.com/apache/beam/pull/30808#issuecomment-2039949330

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/30808?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 68.55%. Comparing base [(`fc5df6f`)](https://app.codecov.io/gh/apache/beam/commit/fc5df6f261bbe6ea910ffc1de8e6c093c9751c60?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`d4258cd`)](https://app.codecov.io/gh/apache/beam/pull/30808?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 25 commits behind head on master.
   
   > :exclamation: Current head d4258cd differs from pull request most recent head 02c04e6. Consider uploading reports for the commit 02c04e6 to get more accurate results
   
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #30808       +/-   ##
   =============================================
   - Coverage     70.73%   68.55%    -2.18%     
   + Complexity     4468     2985     -1483     
   =============================================
     Files          1256      352      -904     
     Lines        140774    27871   -112903     
     Branches       4306     3231     -1075     
   =============================================
   - Hits          99581    19108    -80473     
   + Misses        37714     7302    -30412     
   + Partials       3479     1461     -2018     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/beam/pull/30808/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [go](https://app.codecov.io/gh/apache/beam/pull/30808/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java](https://app.codecov.io/gh/apache/beam/pull/30808/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `68.55% <ø> (-0.03%)` | :arrow_down: |
   | [python](https://app.codecov.io/gh/apache/beam/pull/30808/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/30808?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1553785984


##########
sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java:
##########
@@ -0,0 +1,2 @@
+package org.apache.beam.sdk.managed;public class ManagedTest {
+}

Review Comment:
   Added



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String ICEBERG = "iceberg";

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on PR #30808:
URL: https://github.com/apache/beam/pull/30808#issuecomment-2038454155

   Also, please fix spotless/lint failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554617836


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1552055943


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   Sounds good. We can explicitly list instead of trying to do a pattern matching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548031412


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   I think the benefit of listing out the set of SchemaTransforms is to have a mapping from user-friendly name to identifier:
   
   ```python
   {
     "ICEBERG": "beam:schematransform:org.apache.beam:iceberg_read:v1", 
     ...
   }
   ```
   
   If instead the API looks like `Managed.read().from(<identifier>)` (ie. if the user will explicitly specify the identifier instead of `"iceberg"`), we might not need to explicitly list the set because essentially any SchemaTransform is eligible to be "managed"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554343771


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);

Review Comment:
   This is included in [these lines](https://github.com/apache/beam/pull/30808/files#diff-666d799f826fb29116181c65747fcff142c9b2cef5cad804005b3f216ad46293R164-R168). There is also a test case for this [here](https://github.com/apache/beam/pull/30808/files#diff-5b10b7b441cda0a0625c14f543fae83ab78798d28a64fa121f1d1c532d331076R98). 
   
   We use FileSystems, so hopefully this will support more than LocalFileSystem? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554539093


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   Cham and I talked about this briefly in a [comment above](https://github.com/apache/beam/pull/30808#discussion_r1546828621). The concern was there may be reason for the Read and Write APIs to differ in the future.
   
   FWIW I'm +1 on unifying to have the one API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554614494


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   And will return a single class `ManagedTransform` instead of `Read` and `Write`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554617595


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)

Review Comment:
   This produces a `PTransform` object not a `PCollectionRowTuple` till we `apply` it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1553784338


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+@AutoService(SchemaTransformProvider.class)
+class ManagedSchemaTransformProvider
+    extends TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:managed:v1";
+  }
+
+  private final Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>();
+
+  private ManagedSchemaTransformProvider(Collection<String> identifiers) {
+    try {
+      for (SchemaTransformProvider schemaTransformProvider :
+          ServiceLoader.load(SchemaTransformProvider.class)) {
+        if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
+          throw new IllegalArgumentException(
+              "Found multiple SchemaTransformProvider implementations with the same identifier "
+                  + schemaTransformProvider.identifier());
+        }
+        schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
+    schemaTransformProviders.entrySet().removeIf(e -> !identifiers.contains(e.getKey()));
+  }
+
+  private static @Nullable ManagedSchemaTransformProvider managedProvider = null;
+
+  public static ManagedSchemaTransformProvider of(Collection<String> supportedIdentifiers) {
+    if (managedProvider == null) {
+      managedProvider = new ManagedSchemaTransformProvider(supportedIdentifiers);
+    }
+    return managedProvider;
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class ManagedConfig {

Review Comment:
   ServiceLoader needs to instantiate all instances of SchemaTransformProvider including this one. This class needs to be public and have a public no-arg constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554044656


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);

Review Comment:
   (it already should support it through LocalFileSystem so probably just test ?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1554437196


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);

Review Comment:
   It feels a bit odd for this to be part of the configuration that one can set externally, even just for testing.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1")
+            .build();
+
+    abstract String getSink();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSink(String sinkIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Write build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying sink.
+     * The map can ignore nullable parameters, but needs to include all required parameters. Check
+     * the underlying sink's configuration schema to see which parameters are available.
+     */
+    public Write withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Write withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Write withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()

Review Comment:
   In fact, it looks like you already have the generic on here. I don't think the ephemeral Read/Write classes hold their weight just to have a `getSource()` and `getSink()` method instead of, say, `getIdentifier()`.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a
+ * defined configuration. A given transform can be built with a {@code Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap.<String, Map>.builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one can provide the
+ * location to a YAML file that contains this information. Say we have the following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl(<config path>);
+ * }</pre>
+ */
+public class Managed {
+
+  // TODO: Dynamically generate a list of supported transforms
+  public static final String ICEBERG = "iceberg";
+
+  /**
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+   * </ul>
+   */
+  public static Read read(String source) {
+
+    return new AutoValue_Managed_Read.Builder()
+        .setSource(
+            Preconditions.checkNotNull(
+                Read.TRANSFORMS.get(source.toLowerCase()),
+                "An unsupported source was specified: '%s'. Please specify one of the following sources: %s",
+                source,
+                Read.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    public static final Map<String, String> TRANSFORMS =
+        ImmutableMap.<String, String>builder()
+            .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+            .build();
+
+    abstract String getSource();
+
+    abstract @Nullable String getConfig();
+
+    abstract @Nullable String getConfigUrl();
+
+    abstract List<String> getSupportedIdentifiers();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSource(String sourceIdentifier);
+
+      abstract Builder setConfig(@Nullable String config);
+
+      abstract Builder setConfigUrl(@Nullable String configUrl);
+
+      abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
+
+      abstract Read build();
+    }
+
+    /**
+     * Use the input Map of configuration arguments to build and instantiate the underlying
+     * transform. The map can ignore nullable parameters, but needs to include all required
+     * parameters. Check the underlying transform's schema ({@link
+     * SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
+     */
+    public Read withConfig(Map<String, Object> config) {
+      return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+    }
+
+    /**
+     * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a
+     * specified YAML file location.
+     */
+    public Read withConfigUrl(String configUrl) {
+      return toBuilder().setConfigUrl(configUrl).build();
+    }
+
+    @VisibleForTesting
+    Read withSupportedIdentifiers(List<String> supportedIdentifiers) {
+      return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+          ManagedSchemaTransformProvider.ManagedConfig.builder()
+              .setTransformIdentifier(getSource())
+              .setConfig(getConfig())
+              .setConfigUrl(getConfigUrl())
+              .build();
+
+      SchemaTransform underlyingTransform =
+          new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+      return input.apply(underlyingTransform);
+    }
+  }
+
+  /**
+   * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
+   * sinks are:
+   *
+   * <ul>
+   *   <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+   * </ul>
+   */
+  public static Write write(String sink) {
+    return new AutoValue_Managed_Write.Builder()
+        .setSink(
+            Preconditions.checkNotNull(
+                Write.TRANSFORMS.get(sink.toLowerCase()),
+                "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s",
+                sink,
+                Write.TRANSFORMS.keySet()))
+        .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values()))
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Write extends SchemaTransform {

Review Comment:
   This seems very redundant with `Read`. In fact, it seems there should just be a single ManagedTransform whose properties are URN, and config[Url], and the `.read()` and `.write()` are just constructors for mapping an IO identifier to the right URN. 
   
   (Bonus, if the constructor if ManagedTransform is package private, you can test against it easily, but don't have to expose odd hooks to the public API.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548103214


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   I don't think we want to automatically pick up anything under, say, `beam:schematransform:org.apache.beam:.*_read.*` as managed. We should have an listing that serves the dual purpose of (1) mapping friendly names to urns and (2) enumerating explicitly what is supported. This listing could be a registry rather than a map here for extensibility. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546978642


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =
+        Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+");
+
+    abstract String getSource();
+
+    abstract Pattern getPattern();

Review Comment:
   What is the meaning of this pattern? (Is Pattern even a valid schema field type?)



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   I realize we don't have any sources yet, but what does the use look like here? 
   
   `p.apply(Managed.read().???)`
   



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {

Review Comment:
   not `ManagedIO`?



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =

Review Comment:
   What is this pattern used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546988098


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {

Review Comment:
   Managed transforms will be more generic than I/O, right ? (even though the first use-case will be read/write).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1552206457


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {

Review Comment:
   Yes, I'm totally fine with a static list for the initial release. The key point is getting the API stable and generating the right protos. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1546944434


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class Managed {
+  public static final String READ = "READ";
+  public static final String WRITE = "WRITE";
+
+  public enum IO {
+    ICEBERG

Review Comment:
   I suspects the APIs may differ over time. For example, dynamic destinations and DLQs makes sense for `write()` but not for `read()`.  Also, a watermark or a timestamp policy might make sense to `read()` but not `write()`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on PR #30808:
URL: https://github.com/apache/beam/pull/30808#issuecomment-2030234554

   Let's also add unit-testing using existing SchemaTransforms (we'll have to mock/override the set of transforms for a group or the pattern used to select the set of transforms for a group).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Java] ManagedIO [beam]

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #30808:
URL: https://github.com/apache/beam/pull/30808#discussion_r1548051734


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+
+public class Managed {
+  public static Read read() {
+    return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build();
+  }
+
+  @AutoValue
+  public abstract static class Read extends SchemaTransform {
+    protected static final Pattern PATTERN =
+        Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+");
+
+    abstract String getSource();
+
+    abstract Pattern getPattern();

Review Comment:
   As cham mentions below, it's to get identifiers that match to a pattern (e.g. "_read" for `Managed.read()`). So users wouldn't be able to do something like `Managed.write().to(<kafka read>)`. 
   
   > Pattern even a valid schema field type
   
   Thanks for catching, yeah this would be problematic for cross-lang. Will make it a string here instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org