You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/19 21:55:28 UTC

[beam] branch master updated: Implemented SchemaTransforms for SingleStoreIO (#24290)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 44aee66979d Implemented SchemaTransforms for SingleStoreIO (#24290)
44aee66979d is described below

commit 44aee66979db73e7a0cda4a4b668bf69f8b4d0c6
Author: AdalbertMemSQL <55...@users.noreply.github.com>
AuthorDate: Mon Dec 19 23:55:20 2022 +0200

    Implemented SchemaTransforms for SingleStoreIO (#24290)
    
    * Implemented SchemaTransforms for SingleStoreIO
    Added default RowMapper and UserDataMapper
    These changes will allow to configure SingleStoreIO easier and to use it with other languages
    
    * Fixed nullable errors
    
    * Changed to don't use .* form of import
    
    * Changed formatter field to be transient
    
    * Nit reformatting
    
    * Fixed bugs in tests
    
    * Moved schema transform classes to the separate folder
    
    * Removed unused imports
    
    * Added package-info file
    
    * check point
    
    * check point
    
    * Resolved comments
    Added DefaultSchema for DataSourceConfiguration
    Changed URNs
    Added checks for empty strings
    Deleted ReadWithPartitions schema transform and added withPartitions options to Read schema transform
    
    * Changed identation
    
    * Fixed build by adding a cast
    
    * Reformatted code
    
    * Added an assertion that convertLogicalTypeFieldToString is called only with logical types
    
    * Refactored code to delete ReadRows and ReadRowsWithPartitions classes
    
    * Update .test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
    
    Co-authored-by: Ahmed Abualsaud <65...@users.noreply.github.com>
    
    * Fixed bug where env variable name was used instead of the value
    
    * Changed to use checkArgument instead of assert
    
    * Added appropriate error message
    
    Co-authored-by: Ahmed Abualsaud <65...@users.noreply.github.com>
---
 .../job_PerformanceTests_SingleStoreIO.groovy      |   2 +-
 .../job_PostCommit_Java_SingleStoreIO_IT.groovy    |  86 ++++++
 .../singlestore/SingleStoreDefaultRowMapper.java   | 287 +++++++++++++++++++++
 .../SingleStoreDefaultUserDataMapper.java          | 114 ++++++++
 .../beam/sdk/io/singlestore/SingleStoreIO.java     |  89 +++++++
 .../beam/sdk/io/singlestore/SingleStoreUtil.java   |   8 +
 ...ingleStoreSchemaTransformReadConfiguration.java |  85 ++++++
 .../SingleStoreSchemaTransformReadProvider.java    | 173 +++++++++++++
 ...ngleStoreSchemaTransformWriteConfiguration.java |  75 ++++++
 .../SingleStoreSchemaTransformWriteProvider.java   | 156 +++++++++++
 .../singlestore/schematransform/package-info.java  |  20 ++
 .../SingleStoreDefaultRowMapperTest.java           | 255 ++++++++++++++++++
 .../SingleStoreDefaultUserDataMapperTest.java      | 136 ++++++++++
 .../singlestore/SingleStoreIODefaultMapperIT.java  | 252 ++++++++++++++++++
 ...reIOIT.java => SingleStoreIOPerformanceIT.java} |  21 +-
 ...IT.java => SingleStoreIOSchemaTransformIT.java} | 213 +++++++--------
 .../sdk/io/singlestore/SingleStoreUtilTest.java    |  17 ++
 .../apache/beam/sdk/io/singlestore/TestHelper.java |  19 ++
 18 files changed, 1884 insertions(+), 124 deletions(-)

diff --git a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
index a5fbc278a24..2d6df33f4f1 100644
--- a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
@@ -80,7 +80,7 @@ job(jobName) {
       switches("--info")
       switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
       switches("-DintegrationTestRunner=dataflow")
-      tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOIT")
+      tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOITPerformance")
     }
   }
 }
diff --git a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
new file mode 100644
index 00000000000..f43da5a0531
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import PostcommitJobBuilder
+import Kubernetes
+
+String jobName = "beam_PostCommit_Java_SingleStoreIO_IT"
+
+void waitForPodWithLabel(job, Kubernetes k8s, String label) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600")
+  }
+}
+
+void waitFor(job, Kubernetes k8s, String resource) {
+  job.steps {
+    shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600")
+  }
+}
+
+
+// This job runs the integration test of java SingleStoreIO class.
+PostcommitJobBuilder.postCommitJob(jobName,
+    'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) {
+      description('Runs the Java SingleStoreIO Integration Test.')
+
+      // Set common parameters.
+      common.setTopLevelMainJobProperties(delegate)
+
+      // Deploy SingleStoreDB cluster
+      String namespace = common.getKubernetesNamespace(jobName)
+      String kubeconfigPath = common.getKubeconfigLocationForNamespace(namespace)
+      Kubernetes k8s = Kubernetes.create(delegate, kubeconfigPath, namespace)
+
+      k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-rbac.yaml"))
+      k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster-crd.yaml"))
+      k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-operator.yaml"))
+      waitForPodWithLabel(delegate, k8s, "sdb-operator")
+
+      k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster.yaml"))
+      waitFor(delegate, k8s, "memsqlclusters.memsql.com")
+
+      String singlestoreHostName = "LOAD_BALANCER_IP"
+      k8s.loadBalancerIP("svc-sdb-cluster-ddl", singlestoreHostName)
+
+      // Define test options
+      Map pipelineOptions = [
+        tempRoot                  : 'gs://temp-storage-for-perf-tests',
+        project                   : 'apache-beam-testing',
+        runner                    : 'DataflowRunner',
+        singleStoreServerName     : "\$${singlestoreHostName}",
+        singleStoreUsername : "admin",
+        singleStorePassword : "secretpass",
+        singleStorePort: "3306",
+        numberOfRecords: "1000",
+      ]
+
+      // Gradle goals for this job.
+      steps {
+        gradle {
+          rootBuildScriptDir(common.checkoutDir)
+          common.setGradleSwitches(delegate)
+          switches("--info")
+          switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
+          switches("-DintegrationTestRunner=dataflow")
+          tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIODefaultMapperIT")
+          tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOSchemaTransformIT")
+        }
+      }
+    }
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java
new file mode 100644
index 00000000000..72d5f3ffa28
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NULL;
+import static java.sql.Types.REAL;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.chrono.ISOChronology;
+
+/** RowMapper that maps {@link ResultSet} row to the {@link Row}. */
+class SingleStoreDefaultRowMapper
+    implements SingleStoreIO.RowMapperWithInit<Row>, SingleStoreIO.RowMapperWithCoder<Row> {
+  @Nullable Schema schema = null;
+  List<ResultSetFieldConverter> converters = new ArrayList<>();
+
+  @Override
+  public void init(ResultSetMetaData metaData) throws SQLException {
+    for (int i = 0; i < metaData.getColumnCount(); i++) {
+      converters.add(ResultSetFieldConverter.of(metaData.getColumnType(i + 1)));
+    }
+
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    for (int i = 0; i < metaData.getColumnCount(); i++) {
+      schemaBuilder.addField(converters.get(i).getSchemaField(metaData, i + 1));
+    }
+    this.schema = schemaBuilder.build();
+  }
+
+  @Override
+  public Row mapRow(ResultSet resultSet) throws Exception {
+    if (schema == null) {
+      throw new UnsupportedOperationException("mapRow is called before init");
+    }
+
+    Row.Builder rowBuilder = Row.withSchema(schema);
+
+    int fieldCount = schema.getFieldCount();
+    for (int i = 0; i < fieldCount; i++) {
+      Object value = converters.get(i).getValue(resultSet, i + 1);
+
+      if (resultSet.wasNull() || value == null) {
+        rowBuilder.addValue(null);
+      } else {
+        rowBuilder.addValue(value);
+      }
+    }
+
+    return rowBuilder.build();
+  }
+
+  @Override
+  public SchemaCoder<Row> getCoder() throws Exception {
+    if (schema == null) {
+      throw new UnsupportedOperationException("getCoder is called before init");
+    }
+
+    return RowCoder.of(this.schema);
+  }
+
+  abstract static class ResultSetFieldConverter implements Serializable {
+    abstract @Nullable Object getValue(ResultSet rs, Integer index) throws SQLException;
+
+    Schema.Field getSchemaField(ResultSetMetaData md, Integer index) throws SQLException {
+      String label = md.getColumnLabel(index);
+      return Schema.Field.of(label, getSchemaFieldType(md, index))
+          .withNullable(md.isNullable(index) == java.sql.ResultSetMetaData.columnNullable);
+    }
+
+    abstract Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index)
+        throws SQLException;
+
+    /**
+     * Interface implemented by functions that extract values of different types from a JDBC
+     * ResultSet.
+     */
+    @FunctionalInterface
+    interface ResultSetFieldExtractor extends Serializable {
+      @Nullable
+      Object extract(ResultSet rs, Integer index) throws SQLException;
+    }
+
+    static ResultSetFieldConverter of(int columnType) {
+      switch (columnType) {
+        case BIT:
+          return new DirectResultSetFieldConverter(BOOLEAN, ResultSet::getBoolean);
+        case TINYINT:
+          return new DirectResultSetFieldConverter(BYTE, ResultSet::getByte);
+        case SMALLINT:
+          return new DirectResultSetFieldConverter(INT16, ResultSet::getShort);
+        case INTEGER:
+          return new DirectResultSetFieldConverter(INT32, ResultSet::getInt);
+        case BIGINT:
+          return new DirectResultSetFieldConverter(INT64, ResultSet::getLong);
+        case REAL:
+          return new DirectResultSetFieldConverter(FLOAT, ResultSet::getFloat);
+        case DOUBLE:
+          return new DirectResultSetFieldConverter(Schema.FieldType.DOUBLE, ResultSet::getDouble);
+        case DECIMAL:
+          return new DirectResultSetFieldConverter(
+              Schema.FieldType.DECIMAL, ResultSet::getBigDecimal);
+        case TIMESTAMP:
+          return new TimestampResultSetFieldConverter();
+        case DATE:
+          return new DateResultSetFieldConverter();
+        case TIME:
+          return new TimeResultSetFieldConverter();
+        case LONGVARBINARY:
+        case VARBINARY:
+        case BINARY:
+          return new BinaryResultSetFieldConverter();
+        case LONGVARCHAR:
+        case VARCHAR:
+        case CHAR:
+          return new CharResultSetFieldConverter();
+        case NULL:
+          return new DirectResultSetFieldConverter(STRING, ResultSet::getString);
+        default:
+          throw new UnsupportedOperationException(
+              "Converting " + columnType + " to Beam schema type is not supported");
+      }
+    }
+  }
+
+  static class DirectResultSetFieldConverter extends ResultSetFieldConverter {
+    Schema.FieldType fieldType;
+    ResultSetFieldExtractor extractor;
+
+    public DirectResultSetFieldConverter(
+        Schema.FieldType fieldType, ResultSetFieldExtractor extractor) {
+      this.fieldType = fieldType;
+      this.extractor = extractor;
+    }
+
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      return extractor.extract(rs, index);
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+      return fieldType;
+    }
+  }
+
+  static class CharResultSetFieldConverter extends ResultSetFieldConverter {
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      return rs.getString(index);
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException {
+      int size = md.getPrecision(index);
+      return Schema.FieldType.logicalType(VariableString.of(size));
+    }
+  }
+
+  static class BinaryResultSetFieldConverter extends ResultSetFieldConverter {
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      return rs.getBytes(index);
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException {
+      int size = md.getPrecision(index);
+      return Schema.FieldType.logicalType(VariableBytes.of(size));
+    }
+  }
+
+  static class TimestampResultSetFieldConverter extends ResultSetFieldConverter {
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      Timestamp ts =
+          rs.getTimestamp(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
+      if (ts == null) {
+        return null;
+      }
+      return new DateTime(ts.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+      return Schema.FieldType.DATETIME;
+    }
+  }
+
+  static class TimeResultSetFieldConverter extends ResultSetFieldConverter {
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      Time time = rs.getTime(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
+      if (time == null) {
+        return null;
+      }
+      return new DateTime(time.getTime(), ISOChronology.getInstanceUTC())
+          .withDate(new LocalDate(0L));
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+      return Schema.FieldType.DATETIME;
+    }
+  }
+
+  static class DateResultSetFieldConverter extends ResultSetFieldConverter {
+    @Override
+    @Nullable
+    Object getValue(ResultSet rs, Integer index) throws SQLException {
+      // TODO(https://github.com/apache/beam/issues/19215) import when joda LocalDate is removed.
+      java.time.LocalDate date = rs.getObject(index, java.time.LocalDate.class);
+      if (date == null) {
+        return null;
+      }
+      ZonedDateTime zdt = date.atStartOfDay(ZoneOffset.UTC);
+      return new DateTime(zdt.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
+    }
+
+    @Override
+    Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+      return Schema.FieldType.DATETIME;
+    }
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java
new file mode 100644
index 00000000000..2035ce0554f
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * UserDataMapper that maps {@link Row} objects. ARRAYs, ITTERABLEs, MAPs and nested ROWs are not
+ * supported.
+ */
+final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMapper<Row> {
+
+  private final transient DateTimeFormatter formatter =
+      DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
+
+  private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
+    checkArgument(
+        type.getTypeName().isLogicalType(),
+        "convertLogicalTypeFieldToString accepts only logical types");
+
+    Schema.LogicalType<Object, Object> logicalType =
+        (Schema.LogicalType<Object, Object>) type.getLogicalType();
+    if (logicalType == null) {
+      throw new UnsupportedOperationException("Failed to extract logical type");
+    }
+
+    Schema.FieldType baseType = logicalType.getBaseType();
+    Object baseValue = logicalType.toBaseType(value);
+    return convertFieldToString(baseType, baseValue);
+  }
+
+  private String convertFieldToString(Schema.FieldType type, Object value) {
+    switch (type.getTypeName()) {
+      case BYTE:
+        return ((Byte) value).toString();
+      case INT16:
+        return ((Short) value).toString();
+      case INT32:
+        return ((Integer) value).toString();
+      case INT64:
+        return ((Long) value).toString();
+      case DECIMAL:
+        return ((BigDecimal) value).toString();
+      case FLOAT:
+        return ((Float) value).toString();
+      case DOUBLE:
+        return ((Double) value).toString();
+      case STRING:
+        return (String) value;
+      case DATETIME:
+        return formatter.print((Instant) value);
+      case BOOLEAN:
+        return ((Boolean) value) ? "1" : "0";
+      case BYTES:
+        return new String((byte[]) value, StandardCharsets.UTF_8);
+      case ARRAY:
+        throw new UnsupportedOperationException(
+            "Writing of ARRAY type is not supported by the default UserDataMapper");
+      case ITERABLE:
+        throw new UnsupportedOperationException(
+            "Writing of ITERABLE type is not supported by the default UserDataMapper");
+      case MAP:
+        throw new UnsupportedOperationException(
+            "Writing of MAP type is not supported by the default UserDataMapper");
+      case ROW:
+        throw new UnsupportedOperationException(
+            "Writing of nested ROW type is not supported by the default UserDataMapper");
+      case LOGICAL_TYPE:
+        return convertLogicalTypeFieldToString(type, value);
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Writing of %s type is not supported by the default UserDataMapper",
+                type.getTypeName().name()));
+    }
+  }
+
+  @Override
+  public List<String> mapRow(Row element) {
+    List<String> res = new ArrayList<>();
+
+    Schema s = element.getSchema();
+    for (int i = 0; i < s.getFieldCount(); i++) {
+      res.add(convertFieldToString(s.getField(i).getType(), element.getValue(i)));
+    }
+
+    return res;
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index 6873ae6b8b3..de2a6805278 100644
--- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +39,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
@@ -53,6 +56,7 @@ import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.dbcp2.DelegatingStatement;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -168,6 +172,14 @@ public class SingleStoreIO {
     return new AutoValue_SingleStoreIO_Read.Builder<T>().setOutputParallelization(true).build();
   }
 
+  /** Read Beam {@link Row}s from a SingleStoreDB datasource. */
+  public static Read<Row> readRows() {
+    return new AutoValue_SingleStoreIO_Read.Builder<Row>()
+        .setRowMapper(new SingleStoreDefaultRowMapper())
+        .setOutputParallelization(true)
+        .build();
+  }
+
   /**
    * Like {@link #read}, but executes multiple instances of the query on the same table for each
    * database partition.
@@ -178,6 +190,16 @@ public class SingleStoreIO {
     return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder<T>().build();
   }
 
+  /**
+   * Like {@link #readRows}, but executes multiple instances of the query on the same table for each
+   * database partition.
+   */
+  public static ReadWithPartitions<Row> readWithPartitionsRows() {
+    return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder<Row>()
+        .setRowMapper(new SingleStoreDefaultRowMapper())
+        .build();
+  }
+
   /**
    * Write data to a SingleStoreDB datasource.
    *
@@ -187,6 +209,13 @@ public class SingleStoreIO {
     return new AutoValue_SingleStoreIO_Write.Builder<T>().build();
   }
 
+  /** Write Beam {@link Row}s to a SingleStoreDB datasource. */
+  public static Write<Row> writeRows() {
+    return new AutoValue_SingleStoreIO_Write.Builder<Row>()
+        .setUserDataMapper(new SingleStoreDefaultUserDataMapper())
+        .build();
+  }
+
   /**
    * An interface used by {@link Read} and {@link ReadWithPartitions} for converting each row of the
    * {@link ResultSet} into an element of the resulting {@link PCollection}.
@@ -196,6 +225,19 @@ public class SingleStoreIO {
     T mapRow(ResultSet resultSet) throws Exception;
   }
 
+  /**
+   * A RowMapper that requires initialization. init method is called during pipeline construction
+   * time.
+   */
+  public interface RowMapperWithInit<T> extends RowMapper<T> {
+    void init(ResultSetMetaData resultSetMetaData) throws Exception;
+  }
+
+  /** A RowMapper that provides a Coder for resulting PCollection. */
+  public interface RowMapperWithCoder<T> extends RowMapper<T> {
+    Coder<T> getCoder() throws Exception;
+  }
+
   /**
    * An interface used by the SingleStoreIO {@link Read} to set the parameters of the {@link
    * PreparedStatement}.
@@ -219,6 +261,7 @@ public class SingleStoreIO {
    * A POJO describing a SingleStoreDB {@link DataSource} by providing all properties needed to
    * create it.
    */
+  @DefaultSchema(AutoValueSchema.class)
   @AutoValue
   public abstract static class DataSourceConfiguration implements Serializable {
     abstract @Nullable String getEndpoint();
@@ -406,6 +449,15 @@ public class SingleStoreIO {
       Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is required");
       String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery());
 
+      if (rowMapper instanceof RowMapperWithInit) {
+        try {
+          ((RowMapperWithInit<?>) rowMapper)
+              .init(getResultSetMetadata(dataSourceConfiguration, actualQuery));
+        } catch (Exception e) {
+          throw new SingleStoreRowMapperInitializationException(e);
+        }
+      }
+
       Coder<T> coder =
           SingleStoreUtil.inferCoder(
               rowMapper,
@@ -432,6 +484,12 @@ public class SingleStoreIO {
       return output;
     }
 
+    public static class SingleStoreRowMapperInitializationException extends RuntimeException {
+      SingleStoreRowMapperInitializationException(Throwable cause) {
+        super("Failed to initialize RowMapper", cause);
+      }
+    }
+
     private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
       DataSourceConfiguration dataSourceConfiguration;
       String query;
@@ -595,6 +653,15 @@ public class SingleStoreIO {
 
       String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery());
 
+      if (rowMapper instanceof RowMapperWithInit) {
+        try {
+          ((RowMapperWithInit<?>) rowMapper)
+              .init(getResultSetMetadata(dataSourceConfiguration, actualQuery));
+        } catch (Exception e) {
+          throw new Read.SingleStoreRowMapperInitializationException(e);
+        }
+      }
+
       Coder<T> coder =
           SingleStoreUtil.inferCoder(
               rowMapper,
@@ -715,6 +782,28 @@ public class SingleStoreIO {
     }
   }
 
+  private static ResultSetMetaData getResultSetMetadata(
+      DataSourceConfiguration dataSourceConfiguration, String query) throws Exception {
+    DataSource dataSource = dataSourceConfiguration.getDataSource();
+    Connection conn = dataSource.getConnection();
+    try {
+      PreparedStatement stmt =
+          conn.prepareStatement(String.format("SELECT * FROM (%s) LIMIT 0", query));
+      try {
+        ResultSetMetaData md = stmt.getMetaData();
+        if (md == null) {
+          throw new Exception("ResultSetMetaData is null");
+        }
+
+        return md;
+      } finally {
+        stmt.close();
+      }
+    } finally {
+      conn.close();
+    }
+  }
+
   /**
    * A {@link PTransform} for writing data to SingleStoreDB. It is used by {@link
    * SingleStoreIO#write()}.
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
index c76b0a678c1..f64885e723f 100644
--- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
@@ -42,6 +42,14 @@ final class SingleStoreUtil {
       CoderRegistry registry,
       SchemaRegistry schemaRegistry,
       Logger log) {
+    if (rowMapper instanceof SingleStoreIO.RowMapperWithCoder) {
+      try {
+        return ((SingleStoreIO.RowMapperWithCoder<OutputT>) rowMapper).getCoder();
+      } catch (Exception e) {
+        log.warn("Unable to infer a coder from RowMapper. Attempting to infer a coder from type.");
+      }
+    }
+
     TypeDescriptor<OutputT> outputType =
         TypeDescriptors.extractFromTypeParameters(
             rowMapper,
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java
new file mode 100644
index 00000000000..6951e9b3183
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<SingleStoreSchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<SingleStoreSchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  public Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @Nullable
+  public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+  @Nullable
+  public abstract String getQuery();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract Boolean getOutputParallelization();
+
+  @Nullable
+  public abstract Boolean getWithPartitions();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
+
+    public abstract Builder setTable(String value);
+
+    public abstract Builder setQuery(String value);
+
+    public abstract Builder setOutputParallelization(Boolean value);
+
+    public abstract Builder setWithPartitions(Boolean value);
+
+    /** Builds the {@link SingleStoreSchemaTransformReadConfiguration} configuration. */
+    public abstract SingleStoreSchemaTransformReadConfiguration build();
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..601fd1a89d1
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+    return new SingleStoreReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:singlestore_read:v1";
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+   * {@link SingleStoreSchemaTransformReadConfiguration}.
+   */
+  private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+   * SingleStoreSchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      String query = configuration.getQuery();
+      Boolean outputParallelization = configuration.getOutputParallelization();
+      Boolean withPartitions = configuration.getWithPartitions();
+
+      Preconditions.checkArgument(
+          !(outputParallelization != null && withPartitions != null && withPartitions),
+          "outputParallelization parameter is not supported for partitioned read");
+
+      if (withPartitions != null && withPartitions) {
+        SingleStoreIO.ReadWithPartitions<Row> readWithPartitions =
+            SingleStoreIO.readWithPartitionsRows();
+
+        if (dataSourceConfiguration != null) {
+          readWithPartitions =
+              readWithPartitions.withDataSourceConfiguration(dataSourceConfiguration);
+        }
+
+        if (table != null && !table.isEmpty()) {
+          readWithPartitions = readWithPartitions.withTable(table);
+        }
+
+        if (query != null && !query.isEmpty()) {
+          readWithPartitions = readWithPartitions.withQuery(query);
+        }
+
+        PCollection<Row> rows = input.getPipeline().apply(readWithPartitions);
+        Schema schema = rows.getSchema();
+
+        return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema));
+      } else {
+        SingleStoreIO.Read<Row> read = SingleStoreIO.readRows();
+
+        if (dataSourceConfiguration != null) {
+          read = read.withDataSourceConfiguration(dataSourceConfiguration);
+        }
+
+        if (table != null && !table.isEmpty()) {
+          read = read.withTable(table);
+        }
+
+        if (query != null && !query.isEmpty()) {
+          read = read.withQuery(query);
+        }
+
+        if (outputParallelization != null) {
+          read = read.withOutputParallelization(outputParallelization);
+        }
+
+        PCollection<Row> rows = input.getPipeline().apply(read);
+        Schema schema = rows.getSchema();
+
+        return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema));
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java
new file mode 100644
index 00000000000..2903035e402
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for writing to SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformWriteProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformWriteConfiguration {
+
+  /** Instantiates a {@link SingleStoreSchemaTransformWriteConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_SingleStoreSchemaTransformWriteConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<SingleStoreSchemaTransformWriteConfiguration>
+      TYPE_DESCRIPTOR = TypeDescriptor.of(SingleStoreSchemaTransformWriteConfiguration.class);
+  private static final SerializableFunction<SingleStoreSchemaTransformWriteConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  public Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @Nullable
+  public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract Integer getBatchSize();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
+
+    public abstract Builder setTable(String value);
+
+    public abstract Builder setBatchSize(Integer value);
+
+    /** Builds the {@link SingleStoreSchemaTransformWriteConfiguration} configuration. */
+    public abstract SingleStoreSchemaTransformWriteConfiguration build();
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java
new file mode 100644
index 00000000000..5b68e1e05c5
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB write jobs configured
+ * using {@link SingleStoreSchemaTransformWriteConfiguration}.
+ */
+public class SingleStoreSchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<SingleStoreSchemaTransformWriteConfiguration> {
+
+  private static final String OUTPUT_TAG = "OUTPUT";
+  public static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<SingleStoreSchemaTransformWriteConfiguration> configurationClass() {
+    return SingleStoreSchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) {
+    return new SingleStoreWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:singlestore_write:v1";
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using
+   * {@link SingleStoreSchemaTransformWriteConfiguration}.
+   */
+  private static class SingleStoreWriteSchemaTransform implements SchemaTransform {
+    private final SingleStoreSchemaTransformWriteConfiguration configuration;
+
+    SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for SingleStoreDB write jobs configured using {@link
+   * SingleStoreSchemaTransformWriteConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final SingleStoreSchemaTransformWriteConfiguration configuration;
+
+    PCollectionRowTupleTransform(SingleStoreSchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+      SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+          configuration.getDataSourceConfiguration();
+      String table = configuration.getTable();
+      Integer batchSize = configuration.getBatchSize();
+
+      SingleStoreIO.Write<Row> write = SingleStoreIO.writeRows();
+
+      if (dataSourceConfiguration != null) {
+        write = write.withDataSourceConfiguration(dataSourceConfiguration);
+      }
+
+      if (table != null && !table.isEmpty()) {
+        write = write.withTable(table);
+      }
+
+      if (batchSize != null) {
+        write = write.withBatchSize(batchSize);
+      }
+
+      PCollection<Integer> res = input.get(INPUT_TAG).apply(write);
+      Schema.Builder schemaBuilder = new Schema.Builder();
+      schemaBuilder.addField("rowsWritten", Schema.FieldType.INT32);
+      Schema schema = schemaBuilder.build();
+      return PCollectionRowTuple.of(
+          OUTPUT_TAG,
+          res.apply(
+                  MapElements.into(TypeDescriptor.of(Row.class))
+                      .via(
+                          (SerializableFunction<Integer, Row>)
+                              a -> {
+                                Row.Builder rowBuilder = Row.withSchema(schema);
+                                rowBuilder.addValue(a);
+                                return rowBuilder.build();
+                              }))
+              .setRowSchema(schema));
+    }
+  }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java
new file mode 100644
index 00000000000..d71e5eb209e
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** SingleStoreIO SchemaTransforms. */
+package org.apache.beam.sdk.io.singlestore.schematransform;
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java
new file mode 100644
index 00000000000..54de09a4e95
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NULL;
+import static java.sql.Types.REAL;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.singlestore.jdbc.client.result.ResultSetMetaData;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test DefaultRowMapper. */
+@RunWith(JUnit4.class)
+public class SingleStoreDefaultRowMapperTest {
+  @Test
+  public void testEmptyRow() throws Exception {
+    ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+    Mockito.when(md.getColumnCount()).thenReturn(0);
+    ResultSet res = Mockito.mock(ResultSet.class);
+
+    SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+    mapper.init(md);
+    Schema s = mapper.getCoder().getSchema();
+    Row r = mapper.mapRow(res);
+
+    assertEquals(0, s.getFieldCount());
+    assertEquals(0, r.getFieldCount());
+  }
+
+  @Test
+  public void testAllDataTypes() throws Exception {
+    ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+
+    Mockito.when(md.getColumnCount()).thenReturn(17);
+
+    Mockito.when(md.getColumnType(1)).thenReturn(BIT);
+    Mockito.when(md.getColumnType(2)).thenReturn(TINYINT);
+    Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT);
+    Mockito.when(md.getColumnType(4)).thenReturn(INTEGER);
+    Mockito.when(md.getColumnType(5)).thenReturn(BIGINT);
+    Mockito.when(md.getColumnType(6)).thenReturn(REAL);
+    Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE);
+    Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL);
+    Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP);
+    Mockito.when(md.getColumnType(10)).thenReturn(DATE);
+    Mockito.when(md.getColumnType(11)).thenReturn(TIME);
+    Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY);
+    Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY);
+    Mockito.when(md.getColumnType(14)).thenReturn(BINARY);
+    Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR);
+    Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR);
+    Mockito.when(md.getColumnType(17)).thenReturn(CHAR);
+
+    for (int i = 12; i <= 17; i++) {
+      Mockito.when(md.getPrecision(i)).thenReturn(10);
+    }
+
+    for (int i = 1; i <= md.getColumnCount(); i++) {
+      Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i);
+      Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNoNulls);
+    }
+
+    ResultSet res = Mockito.mock(ResultSet.class);
+
+    Mockito.when(res.getBoolean(1)).thenReturn(true);
+    Mockito.when(res.getByte(2)).thenReturn((byte) 10);
+    Mockito.when(res.getShort(3)).thenReturn((short) 10);
+    Mockito.when(res.getInt(4)).thenReturn(10);
+    Mockito.when(res.getLong(5)).thenReturn((long) 10);
+    Mockito.when(res.getFloat(6)).thenReturn((float) 10.1);
+    Mockito.when(res.getDouble(7)).thenReturn(10.1);
+    Mockito.when(res.getBigDecimal(8)).thenReturn(new BigDecimal("100.100"));
+    Mockito.when(res.getTimestamp(Mockito.eq(9), Mockito.any()))
+        .thenReturn(Timestamp.valueOf("2022-10-10 10:10:10"));
+    Mockito.when(res.getObject(10, java.time.LocalDate.class))
+        .thenReturn(LocalDate.of(2022, 10, 10));
+    Mockito.when(res.getTime(Mockito.eq(11), Mockito.any())).thenReturn(Time.valueOf("10:10:10"));
+    Mockito.when(res.getBytes(12)).thenReturn("asd".getBytes(StandardCharsets.UTF_8));
+    Mockito.when(res.getBytes(13)).thenReturn("asd".getBytes(StandardCharsets.UTF_8));
+    Mockito.when(res.getBytes(14)).thenReturn("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8));
+    Mockito.when(res.getString(15)).thenReturn("asd");
+    Mockito.when(res.getString(16)).thenReturn("asd");
+    Mockito.when(res.getString(17)).thenReturn("asd");
+
+    Mockito.when(res.wasNull()).thenReturn(false);
+
+    SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+    mapper.init(md);
+    Schema s = mapper.getCoder().getSchema();
+    Row r = mapper.mapRow(res);
+
+    assertEquals(17, s.getFieldCount());
+    for (int i = 0; i < s.getFieldCount(); i++) {
+      assertEquals("c" + (i + 1), s.getField(i).getName());
+    }
+
+    assertEquals(Schema.FieldType.BOOLEAN, s.getField(0).getType());
+    assertEquals(Schema.FieldType.BYTE, s.getField(1).getType());
+    assertEquals(Schema.FieldType.INT16, s.getField(2).getType());
+    assertEquals(Schema.FieldType.INT32, s.getField(3).getType());
+    assertEquals(Schema.FieldType.INT64, s.getField(4).getType());
+    assertEquals(Schema.FieldType.FLOAT, s.getField(5).getType());
+    assertEquals(Schema.FieldType.DOUBLE, s.getField(6).getType());
+    assertEquals(Schema.FieldType.DECIMAL, s.getField(7).getType());
+    assertEquals(Schema.FieldType.DATETIME, s.getField(8).getType());
+    assertEquals(Schema.FieldType.DATETIME, s.getField(9).getType());
+    assertEquals(Schema.FieldType.DATETIME, s.getField(10).getType());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(11).getType().getTypeName());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(12).getType().getTypeName());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(13).getType().getTypeName());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(14).getType().getTypeName());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(15).getType().getTypeName());
+    assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(16).getType().getTypeName());
+    assertEquals(Schema.FieldType.BYTES, s.getField(11).getType().getLogicalType().getBaseType());
+    assertEquals(Schema.FieldType.BYTES, s.getField(12).getType().getLogicalType().getBaseType());
+    assertEquals(Schema.FieldType.BYTES, s.getField(13).getType().getLogicalType().getBaseType());
+    assertEquals(Schema.FieldType.STRING, s.getField(14).getType().getLogicalType().getBaseType());
+    assertEquals(Schema.FieldType.STRING, s.getField(15).getType().getLogicalType().getBaseType());
+    assertEquals(Schema.FieldType.STRING, s.getField(16).getType().getLogicalType().getBaseType());
+
+    assertEquals(17, r.getFieldCount());
+
+    assertEquals(true, r.getBoolean(0));
+    assertEquals((Byte) (byte) 10, r.getByte(1));
+    assertEquals((Short) (short) 10, r.getInt16(2));
+    assertEquals((Integer) 10, r.getInt32(3));
+    assertEquals((Long) (long) 10, r.getInt64(4));
+    assertEquals((Float) (float) 10.1, r.getFloat(5));
+    assertEquals((Double) 10.1, r.getDouble(6));
+    assertEquals(new BigDecimal("100.100"), r.getDecimal(7));
+    assertEquals(0, new DateTime("2022-10-10T10:10:10").compareTo(r.getDateTime(8)));
+    assertEquals(0, new DateTime("2022-10-10T00:00:00Z").compareTo(r.getDateTime(9)));
+    assertEquals(0, new DateTime("1970-01-01T10:10:10").compareTo(r.getDateTime(10)));
+    assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11));
+    assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12));
+    assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13));
+    assertEquals("asd", r.getString(14));
+    assertEquals("asd", r.getString(15));
+    assertEquals("asd", r.getString(16));
+  }
+
+  @Test
+  public void testNullValues() throws Exception {
+    ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+
+    Mockito.when(md.getColumnCount()).thenReturn(18);
+
+    Mockito.when(md.getColumnType(1)).thenReturn(BIT);
+    Mockito.when(md.getColumnType(2)).thenReturn(TINYINT);
+    Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT);
+    Mockito.when(md.getColumnType(4)).thenReturn(INTEGER);
+    Mockito.when(md.getColumnType(5)).thenReturn(BIGINT);
+    Mockito.when(md.getColumnType(6)).thenReturn(REAL);
+    Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE);
+    Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL);
+    Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP);
+    Mockito.when(md.getColumnType(10)).thenReturn(DATE);
+    Mockito.when(md.getColumnType(11)).thenReturn(TIME);
+    Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY);
+    Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY);
+    Mockito.when(md.getColumnType(14)).thenReturn(BINARY);
+    Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR);
+    Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR);
+    Mockito.when(md.getColumnType(17)).thenReturn(CHAR);
+    Mockito.when(md.getColumnType(18)).thenReturn(NULL);
+
+    for (int i = 1; i <= md.getColumnCount(); i++) {
+      Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i);
+      Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNullable);
+    }
+
+    ResultSet res = Mockito.mock(ResultSet.class);
+
+    Mockito.when(res.getBoolean(1)).thenReturn(false);
+    Mockito.when(res.getByte(2)).thenReturn((byte) 0);
+    Mockito.when(res.getShort(3)).thenReturn((short) 0);
+    Mockito.when(res.getInt(4)).thenReturn(0);
+    Mockito.when(res.getLong(5)).thenReturn((long) 0);
+    Mockito.when(res.getFloat(6)).thenReturn((float) 0);
+    Mockito.when(res.getDouble(7)).thenReturn(0.0);
+    Mockito.when(res.getBigDecimal(8)).thenReturn(null);
+    Mockito.when(res.getTimestamp(9)).thenReturn(null);
+    Mockito.when(res.getDate(10)).thenReturn(null);
+    Mockito.when(res.getTime(11)).thenReturn(null);
+    Mockito.when(res.getBytes(12)).thenReturn(null);
+    Mockito.when(res.getBytes(13)).thenReturn(null);
+    Mockito.when(res.getBytes(14)).thenReturn(null);
+    Mockito.when(res.getString(15)).thenReturn(null);
+    Mockito.when(res.getString(16)).thenReturn(null);
+    Mockito.when(res.getString(17)).thenReturn(null);
+    Mockito.when(res.getString(18)).thenReturn(null);
+
+    Mockito.when(res.wasNull()).thenReturn(true);
+
+    SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+    mapper.init(md);
+    Schema s = mapper.getCoder().getSchema();
+    Row r = mapper.mapRow(res);
+
+    assertEquals(18, s.getFieldCount());
+    for (int i = 0; i < s.getFieldCount(); i++) {
+      assertEquals("c" + (i + 1), s.getField(i).getName());
+    }
+
+    assertEquals(18, r.getFieldCount());
+    for (int i = 0; i < r.getFieldCount(); i++) {
+      assertNull(r.getValue(i));
+    }
+  }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java
new file mode 100644
index 00000000000..6472ad13cff
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test DefaultUserDataMapper. */
+@RunWith(JUnit4.class)
+public class SingleStoreDefaultUserDataMapperTest {
+  @Test
+  public void testNullValues() {}
+
+  @Test
+  public void testBigNumbers() {}
+
+  @Test
+  public void testBigNegativeNumbers() {
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    schemaBuilder.addField("byte", Schema.FieldType.BYTE);
+    schemaBuilder.addField("int16", Schema.FieldType.INT16);
+    schemaBuilder.addField("int32", Schema.FieldType.INT32);
+    schemaBuilder.addField("int64", Schema.FieldType.INT64);
+    schemaBuilder.addField("float", Schema.FieldType.FLOAT);
+    schemaBuilder.addField("double", Schema.FieldType.DOUBLE);
+    schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL);
+    Schema schema = schemaBuilder.build();
+
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    rowBuilder.addValue(Byte.MIN_VALUE);
+    rowBuilder.addValue(Short.MIN_VALUE);
+    rowBuilder.addValue(Integer.MIN_VALUE);
+    rowBuilder.addValue(Long.MIN_VALUE);
+    rowBuilder.addValue(-Float.MAX_VALUE);
+    rowBuilder.addValue(-Double.MAX_VALUE);
+    rowBuilder.addValue(new BigDecimal("-10000000000000.1000000000000000000000"));
+    Row row = rowBuilder.build();
+
+    SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+    List<String> res = mapper.mapRow(row);
+
+    assertEquals(7, res.size());
+    assertEquals("-128", res.get(0));
+    assertEquals("-32768", res.get(1));
+    assertEquals("-2147483648", res.get(2));
+    assertEquals("-9223372036854775808", res.get(3));
+    assertEquals("-3.4028235E38", res.get(4));
+    assertEquals("-1.7976931348623157E308", res.get(5));
+    assertEquals("-10000000000000.1000000000000000000000", res.get(6));
+  }
+
+  @Test
+  public void testEmptyRow() {
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    Schema schema = schemaBuilder.build();
+
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    Row row = rowBuilder.build();
+
+    SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+    List<String> res = mapper.mapRow(row);
+
+    assertEquals(0, res.size());
+  }
+
+  @Test
+  public void testAllDataTypes() {
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    schemaBuilder.addField("byte", Schema.FieldType.BYTE);
+    schemaBuilder.addField("int16", Schema.FieldType.INT16);
+    schemaBuilder.addField("int32", Schema.FieldType.INT32);
+    schemaBuilder.addField("int64", Schema.FieldType.INT64);
+    schemaBuilder.addField("float", Schema.FieldType.FLOAT);
+    schemaBuilder.addField("double", Schema.FieldType.DOUBLE);
+    schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL);
+    schemaBuilder.addField("boolean", Schema.FieldType.BOOLEAN);
+    schemaBuilder.addField("datetime", Schema.FieldType.DATETIME);
+    schemaBuilder.addField("bytes", Schema.FieldType.BYTES);
+    schemaBuilder.addField("string", Schema.FieldType.STRING);
+    Schema schema = schemaBuilder.build();
+
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    rowBuilder.addValue((byte) 10);
+    rowBuilder.addValue((short) 10);
+    rowBuilder.addValue(10);
+    rowBuilder.addValue((long) 10);
+    rowBuilder.addValue((float) 10.1);
+    rowBuilder.addValue(10.1);
+    rowBuilder.addValue(new BigDecimal("10.1"));
+    rowBuilder.addValue(false);
+    rowBuilder.addValue(new DateTime("2022-01-01T10:10:10.012Z"));
+    rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+    rowBuilder.addValue("asd");
+    Row row = rowBuilder.build();
+
+    SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+    List<String> res = mapper.mapRow(row);
+
+    assertEquals(11, res.size());
+    assertEquals("10", res.get(0));
+    assertEquals("10", res.get(1));
+    assertEquals("10", res.get(2));
+    assertEquals("10", res.get(3));
+    assertEquals("10.1", res.get(4));
+    assertEquals("10.1", res.get(5));
+    assertEquals("10.1", res.get(6));
+    assertEquals("0", res.get(7));
+    assertEquals("2022-01-01 10:10:10.012", res.get(8));
+    assertEquals("asd", res.get(9));
+    assertEquals("asd", res.get(10));
+  }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java
new file mode 100644
index 00000000000..32fc9d1a11f
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SingleStoreIODefaultMapperIT {
+  private static final String DATABASE_NAME = "SingleStoreIOIT";
+
+  private static int numberOfRows;
+
+  private static String tableName;
+
+  private static String serverName;
+
+  private static String username;
+
+  private static String password;
+
+  private static Integer port;
+
+  private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
+
+  private static Schema schema;
+
+  @BeforeClass
+  public static void setup() {
+    SingleStoreIOTestPipelineOptions options;
+    try {
+      options = readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
+    } catch (IllegalArgumentException e) {
+      options = null;
+    }
+    org.junit.Assume.assumeNotNull(options);
+
+    numberOfRows = options.getNumberOfRecords();
+    serverName = options.getSingleStoreServerName();
+    username = options.getSingleStoreUsername();
+    password = options.getSingleStorePassword();
+    port = options.getSingleStorePort();
+    tableName = DatabaseTestHelper.getTestTableName("IT");
+    dataSourceConfiguration =
+        SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port)
+            .withDatabase(DATABASE_NAME)
+            .withPassword(password)
+            .withUsername(username);
+
+    generateSchema();
+  }
+
+  private static void generateSchema() {
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    schemaBuilder.addField("c1", Schema.FieldType.BOOLEAN);
+    schemaBuilder.addField("c2", Schema.FieldType.BYTE);
+    schemaBuilder.addField("c3", Schema.FieldType.INT16);
+    schemaBuilder.addField("c4", Schema.FieldType.INT32);
+    schemaBuilder.addField("c5", Schema.FieldType.INT64);
+    schemaBuilder.addField("c6", Schema.FieldType.FLOAT);
+    schemaBuilder.addField("c7", Schema.FieldType.DOUBLE);
+    schemaBuilder.addField("c8", Schema.FieldType.DECIMAL);
+    schemaBuilder.addField("c9", Schema.FieldType.DATETIME);
+    schemaBuilder.addField("c10", Schema.FieldType.DATETIME);
+    schemaBuilder.addField("c11", Schema.FieldType.DATETIME);
+    schemaBuilder.addField("c12", Schema.FieldType.BYTES);
+    schemaBuilder.addField("c13", Schema.FieldType.BYTES);
+    schemaBuilder.addField("c14", Schema.FieldType.BYTES);
+    schemaBuilder.addField("c15", Schema.FieldType.STRING);
+    schemaBuilder.addField("c16", Schema.FieldType.STRING);
+    schemaBuilder.addField("c17", Schema.FieldType.STRING);
+
+    schema = schemaBuilder.build();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteThenRead() throws Exception {
+    TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
+    createTable();
+
+    try {
+      PipelineResult writeResult = runWrite();
+      assertEquals(PipelineResult.State.DONE, writeResult.waitUntilFinish());
+      PipelineResult readResult = runRead();
+      assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish());
+    } finally {
+      DataSource dataSource = dataSourceConfiguration.getDataSource();
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
+    }
+  }
+
+  private void createTable() throws SQLException {
+    DataSource dataSource = dataSourceConfiguration.getDataSource();
+    Connection conn = dataSource.getConnection();
+    try {
+      Statement stmt = conn.createStatement();
+      stmt.executeUpdate("DROP TABLE IF EXISTS " + tableName);
+      stmt.executeUpdate(
+          "CREATE TABLE "
+              + tableName
+              + "("
+              + "c1 BIT, "
+              + "c2 TINYINT, "
+              + "c3 SMALLINT, "
+              + "c4 INTEGER, "
+              + "c5 BIGINT, "
+              + "c6 FLOAT, "
+              + "c7 DOUBLE, "
+              + "c8 DECIMAL(10, 5), "
+              + "c9 TIMESTAMP, "
+              + "c10 DATE, "
+              + "c11 TIME, "
+              + "c12 BLOB, "
+              + "c13 TINYBLOB, "
+              + "c14 BINARY(10), "
+              + "c15 MEDIUMTEXT, "
+              + "c16 TINYTEXT, "
+              + "c17 CHAR(10) "
+              + ")");
+    } finally {
+      conn.close();
+    }
+  }
+
+  @Rule public TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public TestPipeline pipelineRead = TestPipeline.create();
+
+  public static class ConstructRowFn extends DoFn<Long, Row> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      Row.Builder rowBuilder = Row.withSchema(schema);
+      rowBuilder.addValue(Boolean.TRUE);
+      rowBuilder.addValue((byte) 10);
+      rowBuilder.addValue((short) 10);
+      rowBuilder.addValue(10);
+      rowBuilder.addValue((long) 10);
+      rowBuilder.addValue((float) 10.1);
+      rowBuilder.addValue(10.1);
+      rowBuilder.addValue(new BigDecimal("10.1"));
+      rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+      rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+      rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+      rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+      rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+      rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+      rowBuilder.addValue("asd");
+      rowBuilder.addValue("asd");
+      rowBuilder.addValue("asd");
+      Row row = rowBuilder.build();
+
+      c.output(row);
+    }
+  }
+
+  private PipelineResult runWrite() {
+    pipelineWrite
+        .apply(GenerateSequence.from(0).to(numberOfRows))
+        .apply(ParDo.of(new ConstructRowFn()))
+        .setRowSchema(schema)
+        .apply(
+            SingleStoreIO.writeRows()
+                .withDataSourceConfiguration(dataSourceConfiguration)
+                .withTable(tableName));
+
+    return pipelineWrite.run();
+  }
+
+  private PipelineResult runRead() {
+    PCollection<Row> res =
+        pipelineRead.apply(
+            SingleStoreIO.readRows()
+                .withDataSourceConfiguration(dataSourceConfiguration)
+                .withTable(tableName));
+
+    PAssert.thatSingleton(res.apply("Count All", Count.globally())).isEqualTo((long) numberOfRows);
+
+    res.apply(ParDo.of(new CheckDoFn())).setCoder(VoidCoder.of());
+
+    return pipelineRead.run();
+  }
+
+  private static class CheckDoFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void process(ProcessContext c) {
+      Row r = c.element();
+      Assert.assertNotNull(r);
+      assertEquals(Boolean.TRUE, r.getBoolean(0));
+      assertEquals((Byte) (byte) 10, r.getByte(1));
+      assertEquals((Short) (short) 10, r.getInt16(2));
+      assertEquals((Integer) 10, r.getInt32(3));
+      assertEquals((Long) (long) 10, r.getInt64(4));
+      assertEquals((Float) (float) 10.1, r.getFloat(5));
+      assertEquals((Double) 10.1, r.getDouble(6));
+      assertEquals(new BigDecimal("10.10000"), r.getDecimal(7));
+      assertEquals(0, new DateTime("2022-01-01T10:10:10Z").compareTo(r.getDateTime(8)));
+      assertEquals(0, new DateTime("2022-01-01T00:00:00Z").compareTo(r.getDateTime(9)));
+      assertEquals(0, new DateTime("1970-01-01T10:10:10Z").compareTo(r.getDateTime(10)));
+      assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11));
+      assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12));
+      assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13));
+      assertEquals("asd", r.getString(14));
+      assertEquals("asd", r.getString(15));
+      assertEquals("asd", r.getString(16));
+    }
+  }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
similarity index 92%
copy from sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
copy to sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
index cdbcacd0443..8adf8a47dc1 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
@@ -22,9 +22,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.cloud.Timestamp;
 import com.singlestore.jdbc.SingleStoreDataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -58,9 +55,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class SingleStoreIOIT {
+public class SingleStoreIOPerformanceIT {
 
-  private static final String NAMESPACE = SingleStoreIOIT.class.getName();
+  private static final String NAMESPACE = SingleStoreIOPerformanceIT.class.getName();
 
   private static final String DATABASE_NAME = "SingleStoreIOIT";
 
@@ -109,22 +106,10 @@ public class SingleStoreIOIT {
             .get();
   }
 
-  void createDatabaseIfNotExists() throws SQLException {
-    DataSource dataSource =
-        new SingleStoreDataSource(
-            String.format(
-                "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
-                serverName, port, username, password));
-    try (Connection conn = dataSource.getConnection();
-        Statement stmt = conn.createStatement()) {
-      stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_NAME));
-    }
-  }
-
   @Test
   @Category(NeedsRunner.class)
   public void testWriteThenRead() throws Exception {
-    createDatabaseIfNotExists();
+    TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
     DataSource dataSource =
         new SingleStoreDataSource(
             String.format(
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
similarity index 53%
rename from sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
rename to sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
index cdbcacd0443..4ded4cd452a 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
@@ -19,37 +19,38 @@ package org.apache.beam.sdk.io.singlestore;
 
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import com.google.cloud.Timestamp;
 import com.singlestore.jdbc.SingleStoreDataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadConfiguration;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadProvider;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteConfiguration;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testutils.NamedTestResult;
-import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
-import org.apache.beam.sdk.testutils.metrics.MetricsReader;
-import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
-import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -58,9 +59,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class SingleStoreIOIT {
-
-  private static final String NAMESPACE = SingleStoreIOIT.class.getName();
+public class SingleStoreIOSchemaTransformIT {
 
   private static final String DATABASE_NAME = "SingleStoreIOIT";
 
@@ -78,8 +77,6 @@ public class SingleStoreIOIT {
 
   private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
 
-  private static InfluxDBSettings settings;
-
   @BeforeClass
   public static void setup() {
     SingleStoreIOTestPipelineOptions options;
@@ -101,30 +98,12 @@ public class SingleStoreIOIT {
             .withDatabase(DATABASE_NAME)
             .withPassword(password)
             .withUsername(username);
-    settings =
-        InfluxDBSettings.builder()
-            .withHost(options.getInfluxHost())
-            .withDatabase(options.getInfluxDatabase())
-            .withMeasurement(options.getInfluxMeasurement())
-            .get();
-  }
-
-  void createDatabaseIfNotExists() throws SQLException {
-    DataSource dataSource =
-        new SingleStoreDataSource(
-            String.format(
-                "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
-                serverName, port, username, password));
-    try (Connection conn = dataSource.getConnection();
-        Statement stmt = conn.createStatement()) {
-      stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_NAME));
-    }
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteThenRead() throws Exception {
-    createDatabaseIfNotExists();
+    TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
     DataSource dataSource =
         new SingleStoreDataSource(
             String.format(
@@ -138,76 +117,64 @@ public class SingleStoreIOIT {
       assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish());
       PipelineResult readResultWithPartitions = runReadWithPartitions();
       assertEquals(PipelineResult.State.DONE, readResultWithPartitions.waitUntilFinish());
-      gatherAndPublishMetrics(writeResult, readResult, readResultWithPartitions);
     } finally {
       DatabaseTestHelper.deleteTable(dataSource, tableName);
     }
   }
 
-  private void gatherAndPublishMetrics(
-      PipelineResult writeResult,
-      PipelineResult readResult,
-      PipelineResult readResultWithPartitions) {
-    String uuid = UUID.randomUUID().toString();
-    String timestamp = Timestamp.now().toString();
-
-    IOITMetrics writeMetrics =
-        new IOITMetrics(
-            getMetricSuppliers(uuid, timestamp, "write_time"),
-            writeResult,
-            NAMESPACE,
-            uuid,
-            timestamp);
-    writeMetrics.publishToInflux(settings);
-
-    IOITMetrics readMetrics =
-        new IOITMetrics(
-            getMetricSuppliers(uuid, timestamp, "read_time"),
-            readResult,
-            NAMESPACE,
-            uuid,
-            timestamp);
-    readMetrics.publishToInflux(settings);
-
-    IOITMetrics readMetricsWithPartitions =
-        new IOITMetrics(
-            getMetricSuppliers(uuid, timestamp, "read_with_partitions_time"),
-            readResultWithPartitions,
-            NAMESPACE,
-            uuid,
-            timestamp);
-    readMetricsWithPartitions.publishToInflux(settings);
-  }
-
-  private Set<Function<MetricsReader, NamedTestResult>> getMetricSuppliers(
-      String uuid, String timestamp, String metricName) {
-    Set<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<>();
-
-    suppliers.add(
-        reader -> {
-          long writeStart = reader.getStartTimeMetric(metricName);
-          long writeEnd = reader.getEndTimeMetric(metricName);
-          return NamedTestResult.create(uuid, timestamp, metricName, (writeEnd - writeStart) / 1e3);
-        });
-
-    return suppliers;
-  }
-
   @Rule public TestPipeline pipelineWrite = TestPipeline.create();
   @Rule public TestPipeline pipelineRead = TestPipeline.create();
   @Rule public TestPipeline pipelineReadWithPartitions = TestPipeline.create();
 
   private PipelineResult runWrite() {
-    PCollection<Integer> writtenRows =
+    SchemaTransformProvider provider = new SingleStoreSchemaTransformWriteProvider();
+
+    SingleStoreSchemaTransformWriteConfiguration configuration =
+        SingleStoreSchemaTransformWriteConfiguration.builder()
+            .setDataSourceConfiguration(dataSourceConfiguration)
+            .setTable(tableName)
+            .setBatchSize(100)
+            .build();
+
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+        schemaTransform.buildTransform();
+
+    Schema.Builder schemaBuilder = new Schema.Builder();
+    schemaBuilder.addField("id", Schema.FieldType.INT32);
+    schemaBuilder.addField("name", Schema.FieldType.STRING);
+    Schema schema = schemaBuilder.build();
+
+    PCollection<Row> rows =
         pipelineWrite
             .apply(GenerateSequence.from(0).to(numberOfRows))
             .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
-            .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time")))
             .apply(
-                SingleStoreIO.<TestRow>write()
-                    .withDataSourceConfiguration(dataSourceConfiguration)
-                    .withTable(tableName)
-                    .withUserDataMapper(new TestHelper.TestUserDataMapper()));
+                "Convert TestRows to Rows",
+                MapElements.into(TypeDescriptor.of(Row.class))
+                    .via(
+                        (SerializableFunction<TestRow, Row>)
+                            testRow -> {
+                              Row.Builder rowBuilder = Row.withSchema(schema);
+                              rowBuilder.addValue(testRow.id());
+                              rowBuilder.addValue(testRow.name());
+                              return rowBuilder.build();
+                            }))
+            .setRowSchema(schema);
+
+    PCollectionRowTuple input =
+        PCollectionRowTuple.of(SingleStoreSchemaTransformWriteProvider.INPUT_TAG, rows);
+    String tag = provider.outputCollectionNames().get(0);
+    PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+    assertTrue(output.has(tag));
+    PCollection<Integer> writtenRows =
+        output
+            .get(tag)
+            .apply(
+                "Convert Rows to Integers",
+                MapElements.into(TypeDescriptor.of(Integer.class))
+                    .via((SerializableFunction<Row, Integer>) row -> row.getInt32(0)));
 
     PAssert.thatSingleton(writtenRows.apply("Sum All", Sum.integersGlobally()))
         .isEqualTo(numberOfRows);
@@ -216,14 +183,32 @@ public class SingleStoreIOIT {
   }
 
   private PipelineResult runRead() {
+    SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider();
+
+    SingleStoreSchemaTransformReadConfiguration configuration =
+        SingleStoreSchemaTransformReadConfiguration.builder()
+            .setDataSourceConfiguration(dataSourceConfiguration)
+            .setTable(tableName)
+            .setOutputParallelization(true)
+            .build();
+
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+        schemaTransform.buildTransform();
+
+    PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead);
+    String tag = provider.outputCollectionNames().get(0);
+    PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+    assertTrue(output.has(tag));
     PCollection<TestRow> namesAndIds =
-        pipelineRead
+        output
+            .get(tag)
             .apply(
-                SingleStoreIO.<TestRow>read()
-                    .withDataSourceConfiguration(dataSourceConfiguration)
-                    .withTable(tableName)
-                    .withRowMapper(new TestHelper.TestRowMapper()))
-            .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")));
+                MapElements.into(TypeDescriptor.of(TestRow.class))
+                    .via(
+                        (SerializableFunction<Row, TestRow>)
+                            row -> TestRow.create(row.getInt32(0), row.getString(1))));
 
     testReadResult(namesAndIds);
 
@@ -231,14 +216,32 @@ public class SingleStoreIOIT {
   }
 
   private PipelineResult runReadWithPartitions() {
+    SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider();
+
+    SingleStoreSchemaTransformReadConfiguration configuration =
+        SingleStoreSchemaTransformReadConfiguration.builder()
+            .setDataSourceConfiguration(dataSourceConfiguration)
+            .setTable(tableName)
+            .setWithPartitions(true)
+            .build();
+
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+        schemaTransform.buildTransform();
+
+    PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions);
+    String tag = provider.outputCollectionNames().get(0);
+    PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+    assertTrue(output.has(tag));
     PCollection<TestRow> namesAndIds =
-        pipelineReadWithPartitions
+        output
+            .get(tag)
             .apply(
-                SingleStoreIO.<TestRow>readWithPartitions()
-                    .withDataSourceConfiguration(dataSourceConfiguration)
-                    .withTable(tableName)
-                    .withRowMapper(new TestHelper.TestRowMapper()))
-            .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_with_partitions_time")));
+                MapElements.into(TypeDescriptor.of(TestRow.class))
+                    .via(
+                        (SerializableFunction<Row, TestRow>)
+                            row -> TestRow.create(row.getInt32(0), row.getString(1))));
 
     testReadResult(namesAndIds);
 
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
index 4b743e43cd1..6f50d419368 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
@@ -79,6 +79,23 @@ public class SingleStoreUtilTest {
     }
   }
 
+  private static class TestRowMapperWithCoder extends TestRowMapper
+      implements SingleStoreIO.RowMapperWithCoder<TestRow> {
+    @Override
+    public Coder<TestRow> getCoder() throws Exception {
+      return SerializableCoder.of(TestRow.class);
+    }
+  }
+
+  @Test
+  public void testInferCoderFromRowMapper() {
+    SchemaRegistry sr = SchemaRegistry.createDefault();
+    CoderRegistry cr = CoderRegistry.createDefault();
+    Coder<TestRow> c = SerializableCoder.of(TestRow.class);
+
+    assertEquals(c, SingleStoreUtil.inferCoder(new TestRowMapperWithCoder(), cr, sr, LOG));
+  }
+
   @Test
   public void testInferCoderFromSchemaRegistry() {
     SchemaRegistry sr = SchemaRegistry.createDefault();
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
index 7c0acc53909..e04785db4ee 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
@@ -17,9 +17,14 @@
  */
 package org.apache.beam.sdk.io.singlestore;
 
+import com.singlestore.jdbc.SingleStoreDataSource;
+import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import javax.sql.DataSource;
 import org.apache.beam.sdk.io.common.TestRow;
 
 public class TestHelper {
@@ -67,4 +72,18 @@ public class TestHelper {
       return "secretPass";
     }
   }
+
+  public static void createDatabaseIfNotExists(
+      String serverName, Integer port, String username, String password, String databaseName)
+      throws SQLException {
+    DataSource dataSource =
+        new SingleStoreDataSource(
+            String.format(
+                "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
+                serverName, port, username, password));
+    try (Connection conn = dataSource.getConnection();
+        Statement stmt = conn.createStatement()) {
+      stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName));
+    }
+  }
 }