You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/21 16:05:30 UTC

[GitHub] [beam] AdalbertMemSQL opened a new pull request, #24290: Implemented SchemaTransforms for SingleStoreIO

AdalbertMemSQL opened a new pull request, #24290:
URL: https://github.com/apache/beam/pull/24290

   Added default RowMapper and UserDataMapper.
   Implemented SchemaTransform for Read, ReadWithPartitions, and Write PTransforms.
   These changes will allow us to configure SingleStoreIO easier and to use it with other languages
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358066503

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358373815

   I'll be happy to merge once tests are green again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1351379794

   Sorry for the holdup, taking a look now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1052442497


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java:
##########
@@ -37,7 +39,7 @@ final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMa
       DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
 
   private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
-    assert type.getTypeName().isLogicalType();
+    checkArgument(type.getTypeName().isLogicalType(), "<appropriate error message>");

Review Comment:
   Could you replace it with an appropriate error message?
   
   P.S.
   Doing another read of this I realize this check isn't necessary heh, sorry for suggesting it earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1344235501

   @ahmedabu98 Can you please do one more review of this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
pabloem merged PR #24290:
URL: https://github.com/apache/beam/pull/24290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1357962954

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1327388909

   R: @Abacn


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1351692071

   Thanks @AdalbertMemSQL, LGTM so far.
   
   I had another design question, is it necessary to create two new read nested classes (`ReadRows` and `ReadWithPartitionsRows`) that cater to Row objects? Much of the code seems to be boilerplate for `Read` and `ReadWithPartitions`, except only to add some parameters to specify Row object output. It works as it is now, but it's better to be concise and not duplicate code.
   
   It may help to reduce this by creating a new readRows() function that calls on SingleStoreIO.read() and adding the specifications needed to output Rows. Same with partitions, a new readWithPartitionsRows() function. See this [example](https://github.com/apache/beam/blob/ef5351ad50a817498ea9e34a7c514dd9d60fb143/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L575) in BigQueryIO. The same could be applied here, passing in the relevant rowMapper and coder. Although this will probably need a check  at the end of Read/ReadWithPartitions expand() to see if we are reading Rows so that it can set row schema on the output PCollection.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358366936

   I'm ready
   I will be grateful if you could find a committer :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358126661

   @AdalbertMemSQL test is running now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1049891350


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * UserDataMapper that maps {@link Row} objects. ARRAYs, ITTERABLEs, MAPs and nested ROWs are not
+ * supported.
+ */
+final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMapper<Row> {
+
+  private final transient DateTimeFormatter formatter =
+      DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
+
+  private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
+    assert type.getTypeName().isLogicalType();

Review Comment:
   ```suggestion
       checkArgument(
               type.getTypeName().isLogicalType(),
               "<appropriate error message>");;
   ```
   
   make sure you're importing 
   `import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353277844

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1327389084

   R: @johnjcasey


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1322293506

   addresses #22617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1327390106

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1329321873

   @AdalbertMemSQL it is probably worth putting some of these classes into a subdirectory (singlestore/schematransform) to help with organization


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1048556442


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMapper<Row> {
+
+  private final transient DateTimeFormatter formatter =
+      DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
+
+  private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
+    Schema.LogicalType<Object, Object> logicalType =
+        (Schema.LogicalType<Object, Object>) type.getLogicalType();
+    if (logicalType == null) {
+      throw new UnsupportedOperationException("Failed to extract logical type");
+    }

Review Comment:
   You can make use of [`FieldType::isLogicalType`](https://github.com/apache/beam/blob/3ffaa49c3217386291fa163d0a868a686b329ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L643) as a check here.



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<SingleStoreSchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<SingleStoreSchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  public Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @Nullable
+  public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+  @Nullable
+  public abstract String getQuery();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract Boolean getOutputParallelization();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);

Review Comment:
   Just tested this out locally with a simple configuration that had a POJO field and generating a schema worked fine with just the `@AutoValue` decoration: 
   ```
   Field{name=field1, description=, type=STRING NOT NULL, options={{}}}
   Field{name=field2, description=, type=INT32 NOT NULL, options={{}}}
   Field{name=pojoField, description=, type=ROW<
       pojoField1 STRING NOT NULL, 
       pojoField2 INT32 NOT NULL
     > NOT NULL, options={{}}}
   ```



##########
.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import PostcommitJobBuilder
+import Kubernetes
+
+String jobName = "beam_PostCommit_Java_SingleStoreIO_IT"
+
+void waitForPodWithLabel(job, Kubernetes k8s, String label) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600")
+  }
+}
+
+void waitFor(job, Kubernetes k8s, String resource) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600")
+  }
+}
+
+
+// This job runs the integration test of java SingleStoreIO class.
+PostcommitJobBuilder.postCommitJob(jobName,
+    'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) {
+      description('Runs the Java SingleStoreIO Integration Test.')
+
+      // Set common parameters.
+      commonJobProperties.setTopLevelMainJobProperties(delegate)

Review Comment:
   Can you set a timeout here? This is not to set a strict time limit for the job, but more to catch runaway jobs, see example [here](https://github.com/apache/beam/blob/ef5351ad50a817498ea9e34a7c514dd9d60fb143/.test-infra/jenkins/job_PostCommit_Java.groovy#L32).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353211170

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1327389837

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358118069

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358448423

   alright merging! Thanks everyone. Very happy to get this in!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353457204

   Integration test failed due to problems connecting to jdbc: https://ci-beam.apache.org/job/beam_PostCommit_Java_SingleStoreIO_IT_PR/1/console


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353726853

   > Integration test failed due to problems connecting to jdbc: https://ci-beam.apache.org/job/beam_PostCommit_Java_SingleStoreIO_IT_PR/1/console
   
   I hope it is fixed now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1351807221

   I like this idea.
   Will try to implement it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1049856241


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java:
##########
@@ -370,8 +386,6 @@ public DataSource getDataSource() {
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
-    abstract @Nullable Coder<T> getCoder();

Review Comment:
   Now they can do it using RowMapperWithCodder
   https://github.com/apache/beam/pull/24290/files#diff-1ae81cb3e2f6d00213f38c6ebcee815cea50c993e9a4a0514b0d93f7837af0bcR237-R240



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1329319372

   r: @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1035263994


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+   * {@link SingleStoreSchemaTransformReadConfiguration}.
+   */
+  private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+   * SingleStoreSchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      String query = configuration.getQuery();
+      Boolean outputParallelization = configuration.getOutputParallelization();
+
+      SingleStoreIO.ReadRows read = SingleStoreIO.readRows();
+
+      if (dataSourceConfiguration != null) {
+        read = read.withDataSourceConfiguration(dataSourceConfiguration);
+      }
+
+      if (table != null) {

Review Comment:
   ```suggestion
         if (!Strings.isNullOrEmpty(table)) {
   ```



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<SingleStoreSchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<SingleStoreSchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  public Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @Nullable
+  public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+  @Nullable
+  public abstract String getQuery();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract Boolean getOutputParallelization();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);

Review Comment:
   When a remote SDK tries to prepare a configuration Row object to use this IO, how would it set the dataSourceConfiguration? The `DataSourceConfiguration` POJO only exists in the Java SDK



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadWithPartitionsProvider.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB parallel read jobs
+ * configured using {@link SingleStoreSchemaTransformReadWithPartitionsConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadWithPartitionsProvider

Review Comment:
   There's a lot of overlap between this and the `SingleStoreSchemaTransformReadProvider` and configuration classes. I think the only difference is two configuration parameters (this one uses the initialNumReaders parameter and the other uses the outputParallelization parameter).
   
   Would it make sense to combine these two sets of classes into one that includes both parameters? You can add a new `readWithPartitions` boolean parameter that would distinguish between the two read modes. 



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+   * {@link SingleStoreSchemaTransformReadConfiguration}.
+   */
+  private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+   * SingleStoreSchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      String query = configuration.getQuery();
+      Boolean outputParallelization = configuration.getOutputParallelization();
+
+      SingleStoreIO.ReadRows read = SingleStoreIO.readRows();
+
+      if (dataSourceConfiguration != null) {
+        read = read.withDataSourceConfiguration(dataSourceConfiguration);
+      }
+
+      if (table != null) {

Review Comment:
   Strings.isNullOrEmpty to catches the `""` edge case. Consider using it for these checks in the other SchemaTransformProvider classes



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB write jobs configured
+ * using {@link SingleStoreSchemaTransformWriteConfiguration}.
+ */
+public class SingleStoreSchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformWriteConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+  public static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformWriteConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) {
+    return new SingleStoreWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:write", API);

Review Comment:
   See previous comment for URN conventions



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadWithPartitionsProvider.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB parallel read jobs
+ * configured using {@link SingleStoreSchemaTransformReadWithPartitionsConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadWithPartitionsProvider

Review Comment:
   Some thought should be put into this decision. Merging the two read modes would make sense as it is now, but if it's likely that these two modes will develop down the line to have many more differences then keeping them separate makes more sense.



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }

Review Comment:
   It's encouraged to adhere to the URN conventions here: https://beam.apache.org/documentation/programming-guide/#1314-defining-a-urn
   
   for example, something like `beam:schematransform:org.apache.beam:singlestore_read:v1`



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+   * {@link SingleStoreSchemaTransformReadConfiguration}.
+   */
+  private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+   * SingleStoreSchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      String query = configuration.getQuery();
+      Boolean outputParallelization = configuration.getOutputParallelization();
+
+      SingleStoreIO.ReadRows read = SingleStoreIO.readRows();
+
+      if (dataSourceConfiguration != null) {
+        read = read.withDataSourceConfiguration(dataSourceConfiguration);
+      }
+
+      if (table != null) {
+        read = read.withTable(table);
+      }
+
+      if (query != null) {

Review Comment:
   ```suggestion
         if (!Strings.isNullOrEmpty(query)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1036092013


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadWithPartitionsProvider.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB parallel read jobs
+ * configured using {@link SingleStoreSchemaTransformReadWithPartitionsConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadWithPartitionsProvider

Review Comment:
   initialNumReaders parameter is already deleted. So now the only difference is outputParallelization parameter that has sense only for the sequential reading. I don't think that these read modes will evolve a lot. Will try to merge their SchemaTransforms.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1036150848


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<SingleStoreSchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<SingleStoreSchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  public Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @Nullable
+  public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+  @Nullable
+  public abstract String getQuery();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract Boolean getOutputParallelization();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);

Review Comment:
   Hmm...
   That's a good question.
   Is it correct that if I will `@DefaultSchema(AutoValueSchema.class)` before the `DataSourceConfiguration` class then Beam will infer the schema for it and an object with the same schema can be somehow created in other SDKs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1052442497


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java:
##########
@@ -37,7 +39,7 @@ final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMa
       DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
 
   private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
-    assert type.getTypeName().isLogicalType();
+    checkArgument(type.getTypeName().isLogicalType(), "<appropriate error message>");

Review Comment:
   Could you replace it with an appropriate error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] adalbert44 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
adalbert44 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1040898724


##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+
+  private static final String API = "singlestore";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+   * {@link SingleStoreSchemaTransformReadConfiguration}.
+   */
+  private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+   * SingleStoreSchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      String query = configuration.getQuery();
+      Boolean outputParallelization = configuration.getOutputParallelization();
+
+      SingleStoreIO.ReadRows read = SingleStoreIO.readRows();
+
+      if (dataSourceConfiguration != null) {
+        read = read.withDataSourceConfiguration(dataSourceConfiguration);
+      }
+
+      if (table != null) {

Review Comment:
   spootbug doesn't recognize this check 
   ```
   /home/amakarovych-ua/projects/beam/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java:343: error: [argument] incompatible argument for parameter password of setPassword.
           basicDataSource.setPassword(password);
                                       ^
     found   : @Initialized @Nullable String
     required: @Initialized @NonNull String
   ```
   
   I will use `table != null && !table.isEmpty()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353229036

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1048585632


##########
.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import PostcommitJobBuilder
+import Kubernetes
+
+String jobName = "beam_PostCommit_Java_SingleStoreIO_IT"
+
+void waitForPodWithLabel(job, Kubernetes k8s, String label) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600")
+  }
+}
+
+void waitFor(job, Kubernetes k8s, String resource) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600")
+  }
+}
+
+
+// This job runs the integration test of java SingleStoreIO class.
+PostcommitJobBuilder.postCommitJob(jobName,
+    'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) {
+      description('Runs the Java SingleStoreIO Integration Test.')
+
+      // Set common parameters.
+      commonJobProperties.setTopLevelMainJobProperties(delegate)

Review Comment:
   Looks like the default timeout is already set to 100
   ```
     // Sets common top-level job properties for main repository jobs.
     static void setTopLevelMainJobProperties(def context,
         String defaultBranch = 'master',
         int defaultTimeout = 100,
         boolean allowRemotePoll = true,
         String jenkinsExecutorLabel = 'beam',
         boolean cleanWorkspace = true) {
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353303026

   Seed job failed here: https://ci-beam.apache.org/job/beam_SeedJob/10792/console
   
   Seeing `ERROR: (job_PostCommit_Java_SingleStoreIO_IT.groovy, line 44) No such property: commonJobProperties for class: javaposse.jobdsl.dsl.jobs.FreeStyleJob`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353380712

   Run Java SingleStoreIO_IT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1322382999

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @apilloud for label java.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1357958536

   Run Seed Job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1358117921

   
   Run Java SingleStoreIO_IT
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1361718475

   Looks like this PR breaks SingleStoreIO performance test: https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SingleStoreIO/79/
   
   because the test source file has renamed to SingleStoreIOPerformanceIT.java instead of SingleStoreIOITPerformance.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24290:
URL: https://github.com/apache/beam/pull/24290#discussion_r1049819821


##########
.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import PostcommitJobBuilder
+import Kubernetes
+
+String jobName = "beam_PostCommit_Java_SingleStoreIO_IT"
+
+void waitForPodWithLabel(job, Kubernetes k8s, String label) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600")
+  }
+}
+
+void waitFor(job, Kubernetes k8s, String resource) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600")
+  }
+}
+
+
+// This job runs the integration test of java SingleStoreIO class.
+PostcommitJobBuilder.postCommitJob(jobName,
+    'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) {
+      description('Runs the Java SingleStoreIO Integration Test.')
+
+      // Set common parameters.
+      commonJobProperties.setTopLevelMainJobProperties(delegate)

Review Comment:
   ```suggestion
         common.setTopLevelMainJobProperties(delegate)
   ```
   
   typo here 



##########
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java:
##########
@@ -370,8 +386,6 @@ public DataSource getDataSource() {
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
-    abstract @Nullable Coder<T> getCoder();

Review Comment:
   Why not keep the option for users to set their own coders?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AdalbertMemSQL commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
AdalbertMemSQL commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1353201217

   @ahmedabu98 Can you please trigger execution of the job_PostCommit_Java_SingleStoreIO_IT.groovy?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24290: Implemented SchemaTransforms for SingleStoreIO

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24290:
URL: https://github.com/apache/beam/pull/24290#issuecomment-1351731694

   FYI these suggestions ^ (if you decide to implement them) can also be done in a later PR. I don't see any blockers for merging this one, let me know what you decide


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org