You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/19 21:55:28 UTC
[beam] branch master updated: Implemented SchemaTransforms for SingleStoreIO (#24290)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 44aee66979d Implemented SchemaTransforms for SingleStoreIO (#24290)
44aee66979d is described below
commit 44aee66979db73e7a0cda4a4b668bf69f8b4d0c6
Author: AdalbertMemSQL <55...@users.noreply.github.com>
AuthorDate: Mon Dec 19 23:55:20 2022 +0200
Implemented SchemaTransforms for SingleStoreIO (#24290)
* Implemented SchemaTransforms for SingleStoreIO
Added default RowMapper and UserDataMapper
These changes will allow to configure SingleStoreIO easier and to use it with other languages
* Fixed nullable errors
* Changed to don't use .* form of import
* Changed formatter field to be transient
* Nit reformatting
* Fixed bugs in tests
* Moved schema transform classes to the separate folder
* Removed unused imports
* Added package-info file
* check point
* check point
* Resolved comments
Added DefaultSchema for DataSourceConfiguration
Changed URNs
Added checks for empty strings
Deleted ReadWithPartitions schema transform and added withPartitions options to Read schema transform
* Changed identation
* Fixed build by adding a cast
* Reformatted code
* Added an assertion that convertLogicalTypeFieldToString is called only with logical types
* Refactored code to delete ReadRows and ReadRowsWithPartitions classes
* Update .test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
Co-authored-by: Ahmed Abualsaud <65...@users.noreply.github.com>
* Fixed bug where env variable name was used instead of the value
* Changed to use checkArgument instead of assert
* Added appropriate error message
Co-authored-by: Ahmed Abualsaud <65...@users.noreply.github.com>
---
.../job_PerformanceTests_SingleStoreIO.groovy | 2 +-
.../job_PostCommit_Java_SingleStoreIO_IT.groovy | 86 ++++++
.../singlestore/SingleStoreDefaultRowMapper.java | 287 +++++++++++++++++++++
.../SingleStoreDefaultUserDataMapper.java | 114 ++++++++
.../beam/sdk/io/singlestore/SingleStoreIO.java | 89 +++++++
.../beam/sdk/io/singlestore/SingleStoreUtil.java | 8 +
...ingleStoreSchemaTransformReadConfiguration.java | 85 ++++++
.../SingleStoreSchemaTransformReadProvider.java | 173 +++++++++++++
...ngleStoreSchemaTransformWriteConfiguration.java | 75 ++++++
.../SingleStoreSchemaTransformWriteProvider.java | 156 +++++++++++
.../singlestore/schematransform/package-info.java | 20 ++
.../SingleStoreDefaultRowMapperTest.java | 255 ++++++++++++++++++
.../SingleStoreDefaultUserDataMapperTest.java | 136 ++++++++++
.../singlestore/SingleStoreIODefaultMapperIT.java | 252 ++++++++++++++++++
...reIOIT.java => SingleStoreIOPerformanceIT.java} | 21 +-
...IT.java => SingleStoreIOSchemaTransformIT.java} | 213 +++++++--------
.../sdk/io/singlestore/SingleStoreUtilTest.java | 17 ++
.../apache/beam/sdk/io/singlestore/TestHelper.java | 19 ++
18 files changed, 1884 insertions(+), 124 deletions(-)
diff --git a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
index a5fbc278a24..2d6df33f4f1 100644
--- a/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_SingleStoreIO.groovy
@@ -80,7 +80,7 @@ job(jobName) {
switches("--info")
switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
switches("-DintegrationTestRunner=dataflow")
- tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOIT")
+ tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOITPerformance")
}
}
}
diff --git a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
new file mode 100644
index 00000000000..f43da5a0531
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import PostcommitJobBuilder
+import Kubernetes
+
+String jobName = "beam_PostCommit_Java_SingleStoreIO_IT"
+
+void waitForPodWithLabel(job, Kubernetes k8s, String label) {
+ job.steps {
+ shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for-pod-with-label.sh ${label} 600")
+ }
+}
+
+void waitFor(job, Kubernetes k8s, String resource) {
+ job.steps {
+ shell("${k8s.KUBERNETES_DIR}/singlestore/wait-for.sh ${resource} 600")
+ }
+}
+
+
+// This job runs the integration test of java SingleStoreIO class.
+PostcommitJobBuilder.postCommitJob(jobName,
+ 'Run Java SingleStoreIO_IT', 'Java SingleStoreIO Integration Test',this) {
+ description('Runs the Java SingleStoreIO Integration Test.')
+
+ // Set common parameters.
+ common.setTopLevelMainJobProperties(delegate)
+
+ // Deploy SingleStoreDB cluster
+ String namespace = common.getKubernetesNamespace(jobName)
+ String kubeconfigPath = common.getKubeconfigLocationForNamespace(namespace)
+ Kubernetes k8s = Kubernetes.create(delegate, kubeconfigPath, namespace)
+
+ k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-rbac.yaml"))
+ k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster-crd.yaml"))
+ k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-operator.yaml"))
+ waitForPodWithLabel(delegate, k8s, "sdb-operator")
+
+ k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/singlestore/sdb-cluster.yaml"))
+ waitFor(delegate, k8s, "memsqlclusters.memsql.com")
+
+ String singlestoreHostName = "LOAD_BALANCER_IP"
+ k8s.loadBalancerIP("svc-sdb-cluster-ddl", singlestoreHostName)
+
+ // Define test options
+ Map pipelineOptions = [
+ tempRoot : 'gs://temp-storage-for-perf-tests',
+ project : 'apache-beam-testing',
+ runner : 'DataflowRunner',
+ singleStoreServerName : "\$${singlestoreHostName}",
+ singleStoreUsername : "admin",
+ singleStorePassword : "secretpass",
+ singleStorePort: "3306",
+ numberOfRecords: "1000",
+ ]
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(common.checkoutDir)
+ common.setGradleSwitches(delegate)
+ switches("--info")
+ switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
+ switches("-DintegrationTestRunner=dataflow")
+ tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIODefaultMapperIT")
+ tasks(":sdks:java:io:singlestore:integrationTest --tests org.apache.beam.sdk.io.singlestore.SingleStoreIOSchemaTransformIT")
+ }
+ }
+ }
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java
new file mode 100644
index 00000000000..72d5f3ffa28
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapper.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NULL;
+import static java.sql.Types.REAL;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTE;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT16;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.chrono.ISOChronology;
+
+/** RowMapper that maps {@link ResultSet} row to the {@link Row}. */
+class SingleStoreDefaultRowMapper
+ implements SingleStoreIO.RowMapperWithInit<Row>, SingleStoreIO.RowMapperWithCoder<Row> {
+ @Nullable Schema schema = null;
+ List<ResultSetFieldConverter> converters = new ArrayList<>();
+
+ @Override
+ public void init(ResultSetMetaData metaData) throws SQLException {
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ converters.add(ResultSetFieldConverter.of(metaData.getColumnType(i + 1)));
+ }
+
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ schemaBuilder.addField(converters.get(i).getSchemaField(metaData, i + 1));
+ }
+ this.schema = schemaBuilder.build();
+ }
+
+ @Override
+ public Row mapRow(ResultSet resultSet) throws Exception {
+ if (schema == null) {
+ throw new UnsupportedOperationException("mapRow is called before init");
+ }
+
+ Row.Builder rowBuilder = Row.withSchema(schema);
+
+ int fieldCount = schema.getFieldCount();
+ for (int i = 0; i < fieldCount; i++) {
+ Object value = converters.get(i).getValue(resultSet, i + 1);
+
+ if (resultSet.wasNull() || value == null) {
+ rowBuilder.addValue(null);
+ } else {
+ rowBuilder.addValue(value);
+ }
+ }
+
+ return rowBuilder.build();
+ }
+
+ @Override
+ public SchemaCoder<Row> getCoder() throws Exception {
+ if (schema == null) {
+ throw new UnsupportedOperationException("getCoder is called before init");
+ }
+
+ return RowCoder.of(this.schema);
+ }
+
+ abstract static class ResultSetFieldConverter implements Serializable {
+ abstract @Nullable Object getValue(ResultSet rs, Integer index) throws SQLException;
+
+ Schema.Field getSchemaField(ResultSetMetaData md, Integer index) throws SQLException {
+ String label = md.getColumnLabel(index);
+ return Schema.Field.of(label, getSchemaFieldType(md, index))
+ .withNullable(md.isNullable(index) == java.sql.ResultSetMetaData.columnNullable);
+ }
+
+ abstract Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index)
+ throws SQLException;
+
+ /**
+ * Interface implemented by functions that extract values of different types from a JDBC
+ * ResultSet.
+ */
+ @FunctionalInterface
+ interface ResultSetFieldExtractor extends Serializable {
+ @Nullable
+ Object extract(ResultSet rs, Integer index) throws SQLException;
+ }
+
+ static ResultSetFieldConverter of(int columnType) {
+ switch (columnType) {
+ case BIT:
+ return new DirectResultSetFieldConverter(BOOLEAN, ResultSet::getBoolean);
+ case TINYINT:
+ return new DirectResultSetFieldConverter(BYTE, ResultSet::getByte);
+ case SMALLINT:
+ return new DirectResultSetFieldConverter(INT16, ResultSet::getShort);
+ case INTEGER:
+ return new DirectResultSetFieldConverter(INT32, ResultSet::getInt);
+ case BIGINT:
+ return new DirectResultSetFieldConverter(INT64, ResultSet::getLong);
+ case REAL:
+ return new DirectResultSetFieldConverter(FLOAT, ResultSet::getFloat);
+ case DOUBLE:
+ return new DirectResultSetFieldConverter(Schema.FieldType.DOUBLE, ResultSet::getDouble);
+ case DECIMAL:
+ return new DirectResultSetFieldConverter(
+ Schema.FieldType.DECIMAL, ResultSet::getBigDecimal);
+ case TIMESTAMP:
+ return new TimestampResultSetFieldConverter();
+ case DATE:
+ return new DateResultSetFieldConverter();
+ case TIME:
+ return new TimeResultSetFieldConverter();
+ case LONGVARBINARY:
+ case VARBINARY:
+ case BINARY:
+ return new BinaryResultSetFieldConverter();
+ case LONGVARCHAR:
+ case VARCHAR:
+ case CHAR:
+ return new CharResultSetFieldConverter();
+ case NULL:
+ return new DirectResultSetFieldConverter(STRING, ResultSet::getString);
+ default:
+ throw new UnsupportedOperationException(
+ "Converting " + columnType + " to Beam schema type is not supported");
+ }
+ }
+ }
+
+ static class DirectResultSetFieldConverter extends ResultSetFieldConverter {
+ Schema.FieldType fieldType;
+ ResultSetFieldExtractor extractor;
+
+ public DirectResultSetFieldConverter(
+ Schema.FieldType fieldType, ResultSetFieldExtractor extractor) {
+ this.fieldType = fieldType;
+ this.extractor = extractor;
+ }
+
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ return extractor.extract(rs, index);
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+ return fieldType;
+ }
+ }
+
+ static class CharResultSetFieldConverter extends ResultSetFieldConverter {
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ return rs.getString(index);
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException {
+ int size = md.getPrecision(index);
+ return Schema.FieldType.logicalType(VariableString.of(size));
+ }
+ }
+
+ static class BinaryResultSetFieldConverter extends ResultSetFieldConverter {
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ return rs.getBytes(index);
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) throws SQLException {
+ int size = md.getPrecision(index);
+ return Schema.FieldType.logicalType(VariableBytes.of(size));
+ }
+ }
+
+ static class TimestampResultSetFieldConverter extends ResultSetFieldConverter {
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ Timestamp ts =
+ rs.getTimestamp(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
+ if (ts == null) {
+ return null;
+ }
+ return new DateTime(ts.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+ return Schema.FieldType.DATETIME;
+ }
+ }
+
+ static class TimeResultSetFieldConverter extends ResultSetFieldConverter {
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ Time time = rs.getTime(index, Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)));
+ if (time == null) {
+ return null;
+ }
+ return new DateTime(time.getTime(), ISOChronology.getInstanceUTC())
+ .withDate(new LocalDate(0L));
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+ return Schema.FieldType.DATETIME;
+ }
+ }
+
+ static class DateResultSetFieldConverter extends ResultSetFieldConverter {
+ @Override
+ @Nullable
+ Object getValue(ResultSet rs, Integer index) throws SQLException {
+ // TODO(https://github.com/apache/beam/issues/19215) import when joda LocalDate is removed.
+ java.time.LocalDate date = rs.getObject(index, java.time.LocalDate.class);
+ if (date == null) {
+ return null;
+ }
+ ZonedDateTime zdt = date.atStartOfDay(ZoneOffset.UTC);
+ return new DateTime(zdt.toInstant().toEpochMilli(), ISOChronology.getInstanceUTC());
+ }
+
+ @Override
+ Schema.FieldType getSchemaFieldType(ResultSetMetaData md, Integer index) {
+ return Schema.FieldType.DATETIME;
+ }
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java
new file mode 100644
index 00000000000..2035ce0554f
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * UserDataMapper that maps {@link Row} objects. ARRAYs, ITTERABLEs, MAPs and nested ROWs are not
+ * supported.
+ */
+final class SingleStoreDefaultUserDataMapper implements SingleStoreIO.UserDataMapper<Row> {
+
+ private final transient DateTimeFormatter formatter =
+ DateTimeFormat.forPattern("yyyy-MM-DD' 'HH:mm:ss.SSS");
+
+ private String convertLogicalTypeFieldToString(Schema.FieldType type, Object value) {
+ checkArgument(
+ type.getTypeName().isLogicalType(),
+ "convertLogicalTypeFieldToString accepts only logical types");
+
+ Schema.LogicalType<Object, Object> logicalType =
+ (Schema.LogicalType<Object, Object>) type.getLogicalType();
+ if (logicalType == null) {
+ throw new UnsupportedOperationException("Failed to extract logical type");
+ }
+
+ Schema.FieldType baseType = logicalType.getBaseType();
+ Object baseValue = logicalType.toBaseType(value);
+ return convertFieldToString(baseType, baseValue);
+ }
+
+ private String convertFieldToString(Schema.FieldType type, Object value) {
+ switch (type.getTypeName()) {
+ case BYTE:
+ return ((Byte) value).toString();
+ case INT16:
+ return ((Short) value).toString();
+ case INT32:
+ return ((Integer) value).toString();
+ case INT64:
+ return ((Long) value).toString();
+ case DECIMAL:
+ return ((BigDecimal) value).toString();
+ case FLOAT:
+ return ((Float) value).toString();
+ case DOUBLE:
+ return ((Double) value).toString();
+ case STRING:
+ return (String) value;
+ case DATETIME:
+ return formatter.print((Instant) value);
+ case BOOLEAN:
+ return ((Boolean) value) ? "1" : "0";
+ case BYTES:
+ return new String((byte[]) value, StandardCharsets.UTF_8);
+ case ARRAY:
+ throw new UnsupportedOperationException(
+ "Writing of ARRAY type is not supported by the default UserDataMapper");
+ case ITERABLE:
+ throw new UnsupportedOperationException(
+ "Writing of ITERABLE type is not supported by the default UserDataMapper");
+ case MAP:
+ throw new UnsupportedOperationException(
+ "Writing of MAP type is not supported by the default UserDataMapper");
+ case ROW:
+ throw new UnsupportedOperationException(
+ "Writing of nested ROW type is not supported by the default UserDataMapper");
+ case LOGICAL_TYPE:
+ return convertLogicalTypeFieldToString(type, value);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Writing of %s type is not supported by the default UserDataMapper",
+ type.getTypeName().name()));
+ }
+ }
+
+ @Override
+ public List<String> mapRow(Row element) {
+ List<String> res = new ArrayList<>();
+
+ Schema s = element.getSchema();
+ for (int i = 0; i < s.getFieldCount(); i++) {
+ res.add(convertFieldToString(s.getField(i).getType(), element.getValue(i)));
+ }
+
+ return res;
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index 6873ae6b8b3..de2a6805278 100644
--- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@@ -38,6 +39,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
@@ -53,6 +56,7 @@ import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.DelegatingStatement;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -168,6 +172,14 @@ public class SingleStoreIO {
return new AutoValue_SingleStoreIO_Read.Builder<T>().setOutputParallelization(true).build();
}
+ /** Read Beam {@link Row}s from a SingleStoreDB datasource. */
+ public static Read<Row> readRows() {
+ return new AutoValue_SingleStoreIO_Read.Builder<Row>()
+ .setRowMapper(new SingleStoreDefaultRowMapper())
+ .setOutputParallelization(true)
+ .build();
+ }
+
/**
* Like {@link #read}, but executes multiple instances of the query on the same table for each
* database partition.
@@ -178,6 +190,16 @@ public class SingleStoreIO {
return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder<T>().build();
}
+ /**
+ * Like {@link #readRows}, but executes multiple instances of the query on the same table for each
+ * database partition.
+ */
+ public static ReadWithPartitions<Row> readWithPartitionsRows() {
+ return new AutoValue_SingleStoreIO_ReadWithPartitions.Builder<Row>()
+ .setRowMapper(new SingleStoreDefaultRowMapper())
+ .build();
+ }
+
/**
* Write data to a SingleStoreDB datasource.
*
@@ -187,6 +209,13 @@ public class SingleStoreIO {
return new AutoValue_SingleStoreIO_Write.Builder<T>().build();
}
+ /** Write Beam {@link Row}s to a SingleStoreDB datasource. */
+ public static Write<Row> writeRows() {
+ return new AutoValue_SingleStoreIO_Write.Builder<Row>()
+ .setUserDataMapper(new SingleStoreDefaultUserDataMapper())
+ .build();
+ }
+
/**
* An interface used by {@link Read} and {@link ReadWithPartitions} for converting each row of the
* {@link ResultSet} into an element of the resulting {@link PCollection}.
@@ -196,6 +225,19 @@ public class SingleStoreIO {
T mapRow(ResultSet resultSet) throws Exception;
}
+ /**
+ * A RowMapper that requires initialization. init method is called during pipeline construction
+ * time.
+ */
+ public interface RowMapperWithInit<T> extends RowMapper<T> {
+ void init(ResultSetMetaData resultSetMetaData) throws Exception;
+ }
+
+ /** A RowMapper that provides a Coder for resulting PCollection. */
+ public interface RowMapperWithCoder<T> extends RowMapper<T> {
+ Coder<T> getCoder() throws Exception;
+ }
+
/**
* An interface used by the SingleStoreIO {@link Read} to set the parameters of the {@link
* PreparedStatement}.
@@ -219,6 +261,7 @@ public class SingleStoreIO {
* A POJO describing a SingleStoreDB {@link DataSource} by providing all properties needed to
* create it.
*/
+ @DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class DataSourceConfiguration implements Serializable {
abstract @Nullable String getEndpoint();
@@ -406,6 +449,15 @@ public class SingleStoreIO {
Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is required");
String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery());
+ if (rowMapper instanceof RowMapperWithInit) {
+ try {
+ ((RowMapperWithInit<?>) rowMapper)
+ .init(getResultSetMetadata(dataSourceConfiguration, actualQuery));
+ } catch (Exception e) {
+ throw new SingleStoreRowMapperInitializationException(e);
+ }
+ }
+
Coder<T> coder =
SingleStoreUtil.inferCoder(
rowMapper,
@@ -432,6 +484,12 @@ public class SingleStoreIO {
return output;
}
+ public static class SingleStoreRowMapperInitializationException extends RuntimeException {
+ SingleStoreRowMapperInitializationException(Throwable cause) {
+ super("Failed to initialize RowMapper", cause);
+ }
+ }
+
private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
DataSourceConfiguration dataSourceConfiguration;
String query;
@@ -595,6 +653,15 @@ public class SingleStoreIO {
String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), getQuery());
+ if (rowMapper instanceof RowMapperWithInit) {
+ try {
+ ((RowMapperWithInit<?>) rowMapper)
+ .init(getResultSetMetadata(dataSourceConfiguration, actualQuery));
+ } catch (Exception e) {
+ throw new Read.SingleStoreRowMapperInitializationException(e);
+ }
+ }
+
Coder<T> coder =
SingleStoreUtil.inferCoder(
rowMapper,
@@ -715,6 +782,28 @@ public class SingleStoreIO {
}
}
+ private static ResultSetMetaData getResultSetMetadata(
+ DataSourceConfiguration dataSourceConfiguration, String query) throws Exception {
+ DataSource dataSource = dataSourceConfiguration.getDataSource();
+ Connection conn = dataSource.getConnection();
+ try {
+ PreparedStatement stmt =
+ conn.prepareStatement(String.format("SELECT * FROM (%s) LIMIT 0", query));
+ try {
+ ResultSetMetaData md = stmt.getMetaData();
+ if (md == null) {
+ throw new Exception("ResultSetMetaData is null");
+ }
+
+ return md;
+ } finally {
+ stmt.close();
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
/**
* A {@link PTransform} for writing data to SingleStoreDB. It is used by {@link
* SingleStoreIO#write()}.
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
index c76b0a678c1..f64885e723f 100644
--- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtil.java
@@ -42,6 +42,14 @@ final class SingleStoreUtil {
CoderRegistry registry,
SchemaRegistry schemaRegistry,
Logger log) {
+ if (rowMapper instanceof SingleStoreIO.RowMapperWithCoder) {
+ try {
+ return ((SingleStoreIO.RowMapperWithCoder<OutputT>) rowMapper).getCoder();
+ } catch (Exception e) {
+ log.warn("Unable to infer a coder from RowMapper. Attempting to infer a coder from type.");
+ }
+ }
+
TypeDescriptor<OutputT> outputType =
TypeDescriptors.extractFromTypeParameters(
rowMapper,
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java
new file mode 100644
index 00000000000..6951e9b3183
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadConfiguration.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformReadProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformReadConfiguration {
+
+ /** Instantiates a {@link SingleStoreSchemaTransformReadConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_SingleStoreSchemaTransformReadConfiguration.Builder();
+ }
+
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+ private static final TypeDescriptor<SingleStoreSchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+ TypeDescriptor.of(SingleStoreSchemaTransformReadConfiguration.class);
+ private static final SerializableFunction<SingleStoreSchemaTransformReadConfiguration, Row>
+ ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /** Serializes configuration to a {@link Row}. */
+ public Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ @Nullable
+ public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+ @Nullable
+ public abstract String getQuery();
+
+ @Nullable
+ public abstract String getTable();
+
+ @Nullable
+ public abstract Boolean getOutputParallelization();
+
+ @Nullable
+ public abstract Boolean getWithPartitions();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
+
+ public abstract Builder setTable(String value);
+
+ public abstract Builder setQuery(String value);
+
+ public abstract Builder setOutputParallelization(Boolean value);
+
+ public abstract Builder setWithPartitions(Boolean value);
+
+ /** Builds the {@link SingleStoreSchemaTransformReadConfiguration} configuration. */
+ public abstract SingleStoreSchemaTransformReadConfiguration build();
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..601fd1a89d1
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB read jobs configured
+ * using {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+public class SingleStoreSchemaTransformReadProvider
+ extends TypedSchemaTransformProvider<SingleStoreSchemaTransformReadConfiguration> {
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<SingleStoreSchemaTransformReadConfiguration> configurationClass() {
+ return SingleStoreSchemaTransformReadConfiguration.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) {
+ return new SingleStoreReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:singlestore_read:v1";
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using
+ * {@link SingleStoreSchemaTransformReadConfiguration}.
+ */
+ private static class SingleStoreReadSchemaTransform implements SchemaTransform {
+ private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+ SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link
+ * SingleStoreSchemaTransformReadConfiguration}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final SingleStoreSchemaTransformReadConfiguration configuration;
+
+ PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (!input.getAll().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s input is expected to be empty",
+ input.getClass().getSimpleName(), getClass().getSimpleName()));
+ }
+ SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+ configuration.getDataSourceConfiguration();
+ String table = configuration.getTable();
+ String query = configuration.getQuery();
+ Boolean outputParallelization = configuration.getOutputParallelization();
+ Boolean withPartitions = configuration.getWithPartitions();
+
+ Preconditions.checkArgument(
+ !(outputParallelization != null && withPartitions != null && withPartitions),
+ "outputParallelization parameter is not supported for partitioned read");
+
+ if (withPartitions != null && withPartitions) {
+ SingleStoreIO.ReadWithPartitions<Row> readWithPartitions =
+ SingleStoreIO.readWithPartitionsRows();
+
+ if (dataSourceConfiguration != null) {
+ readWithPartitions =
+ readWithPartitions.withDataSourceConfiguration(dataSourceConfiguration);
+ }
+
+ if (table != null && !table.isEmpty()) {
+ readWithPartitions = readWithPartitions.withTable(table);
+ }
+
+ if (query != null && !query.isEmpty()) {
+ readWithPartitions = readWithPartitions.withQuery(query);
+ }
+
+ PCollection<Row> rows = input.getPipeline().apply(readWithPartitions);
+ Schema schema = rows.getSchema();
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema));
+ } else {
+ SingleStoreIO.Read<Row> read = SingleStoreIO.readRows();
+
+ if (dataSourceConfiguration != null) {
+ read = read.withDataSourceConfiguration(dataSourceConfiguration);
+ }
+
+ if (table != null && !table.isEmpty()) {
+ read = read.withTable(table);
+ }
+
+ if (query != null && !query.isEmpty()) {
+ read = read.withQuery(query);
+ }
+
+ if (outputParallelization != null) {
+ read = read.withOutputParallelization(outputParallelization);
+ }
+
+ PCollection<Row> rows = input.getPipeline().apply(read);
+ Schema schema = rows.getSchema();
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, rows.setRowSchema(schema));
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java
new file mode 100644
index 00000000000..2903035e402
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteConfiguration.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for writing to SignleStoreDB.
+ *
+ * <p>This class is meant to be used with {@link SingleStoreSchemaTransformWriteProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SingleStoreSchemaTransformWriteConfiguration {
+
+ /** Instantiates a {@link SingleStoreSchemaTransformWriteConfiguration.Builder}. */
+ public static Builder builder() {
+ return new AutoValue_SingleStoreSchemaTransformWriteConfiguration.Builder();
+ }
+
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+ private static final TypeDescriptor<SingleStoreSchemaTransformWriteConfiguration>
+ TYPE_DESCRIPTOR = TypeDescriptor.of(SingleStoreSchemaTransformWriteConfiguration.class);
+ private static final SerializableFunction<SingleStoreSchemaTransformWriteConfiguration, Row>
+ ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /** Serializes configuration to a {@link Row}. */
+ public Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ @Nullable
+ public abstract SingleStoreIO.DataSourceConfiguration getDataSourceConfiguration();
+
+ @Nullable
+ public abstract String getTable();
+
+ @Nullable
+ public abstract Integer getBatchSize();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setDataSourceConfiguration(SingleStoreIO.DataSourceConfiguration value);
+
+ public abstract Builder setTable(String value);
+
+ public abstract Builder setBatchSize(Integer value);
+
+ /** Builds the {@link SingleStoreSchemaTransformWriteConfiguration} configuration. */
+ public abstract SingleStoreSchemaTransformWriteConfiguration build();
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java
new file mode 100644
index 00000000000..5b68e1e05c5
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore.schematransform;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for SingleStoreDB write jobs configured
+ * using {@link SingleStoreSchemaTransformWriteConfiguration}.
+ */
+public class SingleStoreSchemaTransformWriteProvider
+ extends TypedSchemaTransformProvider<SingleStoreSchemaTransformWriteConfiguration> {
+
+ private static final String OUTPUT_TAG = "OUTPUT";
+ public static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<SingleStoreSchemaTransformWriteConfiguration> configurationClass() {
+ return SingleStoreSchemaTransformWriteConfiguration.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) {
+ return new SingleStoreWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:singlestore_write:v1";
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using
+ * {@link SingleStoreSchemaTransformWriteConfiguration}.
+ */
+ private static class SingleStoreWriteSchemaTransform implements SchemaTransform {
+ private final SingleStoreSchemaTransformWriteConfiguration configuration;
+
+ SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for SingleStoreDB write jobs configured using {@link
+ * SingleStoreSchemaTransformWriteConfiguration}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final SingleStoreSchemaTransformWriteConfiguration configuration;
+
+ PCollectionRowTupleTransform(SingleStoreSchemaTransformWriteConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (!input.has(INPUT_TAG)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s is missing expected tag: %s",
+ getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+ }
+ SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+ configuration.getDataSourceConfiguration();
+ String table = configuration.getTable();
+ Integer batchSize = configuration.getBatchSize();
+
+ SingleStoreIO.Write<Row> write = SingleStoreIO.writeRows();
+
+ if (dataSourceConfiguration != null) {
+ write = write.withDataSourceConfiguration(dataSourceConfiguration);
+ }
+
+ if (table != null && !table.isEmpty()) {
+ write = write.withTable(table);
+ }
+
+ if (batchSize != null) {
+ write = write.withBatchSize(batchSize);
+ }
+
+ PCollection<Integer> res = input.get(INPUT_TAG).apply(write);
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ schemaBuilder.addField("rowsWritten", Schema.FieldType.INT32);
+ Schema schema = schemaBuilder.build();
+ return PCollectionRowTuple.of(
+ OUTPUT_TAG,
+ res.apply(
+ MapElements.into(TypeDescriptor.of(Row.class))
+ .via(
+ (SerializableFunction<Integer, Row>)
+ a -> {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ rowBuilder.addValue(a);
+ return rowBuilder.build();
+ }))
+ .setRowSchema(schema));
+ }
+ }
+}
diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java
new file mode 100644
index 00000000000..d71e5eb209e
--- /dev/null
+++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** SingleStoreIO SchemaTransforms. */
+package org.apache.beam.sdk.io.singlestore.schematransform;
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java
new file mode 100644
index 00000000000..54de09a4e95
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultRowMapperTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NULL;
+import static java.sql.Types.REAL;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.singlestore.jdbc.client.result.ResultSetMetaData;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test DefaultRowMapper. */
+@RunWith(JUnit4.class)
+public class SingleStoreDefaultRowMapperTest {
+ @Test
+ public void testEmptyRow() throws Exception {
+ ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+ Mockito.when(md.getColumnCount()).thenReturn(0);
+ ResultSet res = Mockito.mock(ResultSet.class);
+
+ SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+ mapper.init(md);
+ Schema s = mapper.getCoder().getSchema();
+ Row r = mapper.mapRow(res);
+
+ assertEquals(0, s.getFieldCount());
+ assertEquals(0, r.getFieldCount());
+ }
+
+ @Test
+ public void testAllDataTypes() throws Exception {
+ ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+
+ Mockito.when(md.getColumnCount()).thenReturn(17);
+
+ Mockito.when(md.getColumnType(1)).thenReturn(BIT);
+ Mockito.when(md.getColumnType(2)).thenReturn(TINYINT);
+ Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT);
+ Mockito.when(md.getColumnType(4)).thenReturn(INTEGER);
+ Mockito.when(md.getColumnType(5)).thenReturn(BIGINT);
+ Mockito.when(md.getColumnType(6)).thenReturn(REAL);
+ Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE);
+ Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL);
+ Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP);
+ Mockito.when(md.getColumnType(10)).thenReturn(DATE);
+ Mockito.when(md.getColumnType(11)).thenReturn(TIME);
+ Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY);
+ Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY);
+ Mockito.when(md.getColumnType(14)).thenReturn(BINARY);
+ Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR);
+ Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR);
+ Mockito.when(md.getColumnType(17)).thenReturn(CHAR);
+
+ for (int i = 12; i <= 17; i++) {
+ Mockito.when(md.getPrecision(i)).thenReturn(10);
+ }
+
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i);
+ Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNoNulls);
+ }
+
+ ResultSet res = Mockito.mock(ResultSet.class);
+
+ Mockito.when(res.getBoolean(1)).thenReturn(true);
+ Mockito.when(res.getByte(2)).thenReturn((byte) 10);
+ Mockito.when(res.getShort(3)).thenReturn((short) 10);
+ Mockito.when(res.getInt(4)).thenReturn(10);
+ Mockito.when(res.getLong(5)).thenReturn((long) 10);
+ Mockito.when(res.getFloat(6)).thenReturn((float) 10.1);
+ Mockito.when(res.getDouble(7)).thenReturn(10.1);
+ Mockito.when(res.getBigDecimal(8)).thenReturn(new BigDecimal("100.100"));
+ Mockito.when(res.getTimestamp(Mockito.eq(9), Mockito.any()))
+ .thenReturn(Timestamp.valueOf("2022-10-10 10:10:10"));
+ Mockito.when(res.getObject(10, java.time.LocalDate.class))
+ .thenReturn(LocalDate.of(2022, 10, 10));
+ Mockito.when(res.getTime(Mockito.eq(11), Mockito.any())).thenReturn(Time.valueOf("10:10:10"));
+ Mockito.when(res.getBytes(12)).thenReturn("asd".getBytes(StandardCharsets.UTF_8));
+ Mockito.when(res.getBytes(13)).thenReturn("asd".getBytes(StandardCharsets.UTF_8));
+ Mockito.when(res.getBytes(14)).thenReturn("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8));
+ Mockito.when(res.getString(15)).thenReturn("asd");
+ Mockito.when(res.getString(16)).thenReturn("asd");
+ Mockito.when(res.getString(17)).thenReturn("asd");
+
+ Mockito.when(res.wasNull()).thenReturn(false);
+
+ SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+ mapper.init(md);
+ Schema s = mapper.getCoder().getSchema();
+ Row r = mapper.mapRow(res);
+
+ assertEquals(17, s.getFieldCount());
+ for (int i = 0; i < s.getFieldCount(); i++) {
+ assertEquals("c" + (i + 1), s.getField(i).getName());
+ }
+
+ assertEquals(Schema.FieldType.BOOLEAN, s.getField(0).getType());
+ assertEquals(Schema.FieldType.BYTE, s.getField(1).getType());
+ assertEquals(Schema.FieldType.INT16, s.getField(2).getType());
+ assertEquals(Schema.FieldType.INT32, s.getField(3).getType());
+ assertEquals(Schema.FieldType.INT64, s.getField(4).getType());
+ assertEquals(Schema.FieldType.FLOAT, s.getField(5).getType());
+ assertEquals(Schema.FieldType.DOUBLE, s.getField(6).getType());
+ assertEquals(Schema.FieldType.DECIMAL, s.getField(7).getType());
+ assertEquals(Schema.FieldType.DATETIME, s.getField(8).getType());
+ assertEquals(Schema.FieldType.DATETIME, s.getField(9).getType());
+ assertEquals(Schema.FieldType.DATETIME, s.getField(10).getType());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(11).getType().getTypeName());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(12).getType().getTypeName());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(13).getType().getTypeName());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(14).getType().getTypeName());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(15).getType().getTypeName());
+ assertEquals(Schema.TypeName.LOGICAL_TYPE, s.getField(16).getType().getTypeName());
+ assertEquals(Schema.FieldType.BYTES, s.getField(11).getType().getLogicalType().getBaseType());
+ assertEquals(Schema.FieldType.BYTES, s.getField(12).getType().getLogicalType().getBaseType());
+ assertEquals(Schema.FieldType.BYTES, s.getField(13).getType().getLogicalType().getBaseType());
+ assertEquals(Schema.FieldType.STRING, s.getField(14).getType().getLogicalType().getBaseType());
+ assertEquals(Schema.FieldType.STRING, s.getField(15).getType().getLogicalType().getBaseType());
+ assertEquals(Schema.FieldType.STRING, s.getField(16).getType().getLogicalType().getBaseType());
+
+ assertEquals(17, r.getFieldCount());
+
+ assertEquals(true, r.getBoolean(0));
+ assertEquals((Byte) (byte) 10, r.getByte(1));
+ assertEquals((Short) (short) 10, r.getInt16(2));
+ assertEquals((Integer) 10, r.getInt32(3));
+ assertEquals((Long) (long) 10, r.getInt64(4));
+ assertEquals((Float) (float) 10.1, r.getFloat(5));
+ assertEquals((Double) 10.1, r.getDouble(6));
+ assertEquals(new BigDecimal("100.100"), r.getDecimal(7));
+ assertEquals(0, new DateTime("2022-10-10T10:10:10").compareTo(r.getDateTime(8)));
+ assertEquals(0, new DateTime("2022-10-10T00:00:00Z").compareTo(r.getDateTime(9)));
+ assertEquals(0, new DateTime("1970-01-01T10:10:10").compareTo(r.getDateTime(10)));
+ assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11));
+ assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12));
+ assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13));
+ assertEquals("asd", r.getString(14));
+ assertEquals("asd", r.getString(15));
+ assertEquals("asd", r.getString(16));
+ }
+
+ @Test
+ public void testNullValues() throws Exception {
+ ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
+
+ Mockito.when(md.getColumnCount()).thenReturn(18);
+
+ Mockito.when(md.getColumnType(1)).thenReturn(BIT);
+ Mockito.when(md.getColumnType(2)).thenReturn(TINYINT);
+ Mockito.when(md.getColumnType(3)).thenReturn(SMALLINT);
+ Mockito.when(md.getColumnType(4)).thenReturn(INTEGER);
+ Mockito.when(md.getColumnType(5)).thenReturn(BIGINT);
+ Mockito.when(md.getColumnType(6)).thenReturn(REAL);
+ Mockito.when(md.getColumnType(7)).thenReturn(DOUBLE);
+ Mockito.when(md.getColumnType(8)).thenReturn(DECIMAL);
+ Mockito.when(md.getColumnType(9)).thenReturn(TIMESTAMP);
+ Mockito.when(md.getColumnType(10)).thenReturn(DATE);
+ Mockito.when(md.getColumnType(11)).thenReturn(TIME);
+ Mockito.when(md.getColumnType(12)).thenReturn(LONGVARBINARY);
+ Mockito.when(md.getColumnType(13)).thenReturn(VARBINARY);
+ Mockito.when(md.getColumnType(14)).thenReturn(BINARY);
+ Mockito.when(md.getColumnType(15)).thenReturn(LONGVARCHAR);
+ Mockito.when(md.getColumnType(16)).thenReturn(VARCHAR);
+ Mockito.when(md.getColumnType(17)).thenReturn(CHAR);
+ Mockito.when(md.getColumnType(18)).thenReturn(NULL);
+
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ Mockito.when(md.getColumnLabel(i)).thenReturn("c" + i);
+ Mockito.when(md.isNullable(i)).thenReturn(java.sql.ResultSetMetaData.columnNullable);
+ }
+
+ ResultSet res = Mockito.mock(ResultSet.class);
+
+ Mockito.when(res.getBoolean(1)).thenReturn(false);
+ Mockito.when(res.getByte(2)).thenReturn((byte) 0);
+ Mockito.when(res.getShort(3)).thenReturn((short) 0);
+ Mockito.when(res.getInt(4)).thenReturn(0);
+ Mockito.when(res.getLong(5)).thenReturn((long) 0);
+ Mockito.when(res.getFloat(6)).thenReturn((float) 0);
+ Mockito.when(res.getDouble(7)).thenReturn(0.0);
+ Mockito.when(res.getBigDecimal(8)).thenReturn(null);
+ Mockito.when(res.getTimestamp(9)).thenReturn(null);
+ Mockito.when(res.getDate(10)).thenReturn(null);
+ Mockito.when(res.getTime(11)).thenReturn(null);
+ Mockito.when(res.getBytes(12)).thenReturn(null);
+ Mockito.when(res.getBytes(13)).thenReturn(null);
+ Mockito.when(res.getBytes(14)).thenReturn(null);
+ Mockito.when(res.getString(15)).thenReturn(null);
+ Mockito.when(res.getString(16)).thenReturn(null);
+ Mockito.when(res.getString(17)).thenReturn(null);
+ Mockito.when(res.getString(18)).thenReturn(null);
+
+ Mockito.when(res.wasNull()).thenReturn(true);
+
+ SingleStoreDefaultRowMapper mapper = new SingleStoreDefaultRowMapper();
+ mapper.init(md);
+ Schema s = mapper.getCoder().getSchema();
+ Row r = mapper.mapRow(res);
+
+ assertEquals(18, s.getFieldCount());
+ for (int i = 0; i < s.getFieldCount(); i++) {
+ assertEquals("c" + (i + 1), s.getField(i).getName());
+ }
+
+ assertEquals(18, r.getFieldCount());
+ for (int i = 0; i < r.getFieldCount(); i++) {
+ assertNull(r.getValue(i));
+ }
+ }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java
new file mode 100644
index 00000000000..6472ad13cff
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreDefaultUserDataMapperTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test DefaultUserDataMapper. */
+@RunWith(JUnit4.class)
+public class SingleStoreDefaultUserDataMapperTest {
+ @Test
+ public void testNullValues() {}
+
+ @Test
+ public void testBigNumbers() {}
+
+ @Test
+ public void testBigNegativeNumbers() {
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ schemaBuilder.addField("byte", Schema.FieldType.BYTE);
+ schemaBuilder.addField("int16", Schema.FieldType.INT16);
+ schemaBuilder.addField("int32", Schema.FieldType.INT32);
+ schemaBuilder.addField("int64", Schema.FieldType.INT64);
+ schemaBuilder.addField("float", Schema.FieldType.FLOAT);
+ schemaBuilder.addField("double", Schema.FieldType.DOUBLE);
+ schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL);
+ Schema schema = schemaBuilder.build();
+
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ rowBuilder.addValue(Byte.MIN_VALUE);
+ rowBuilder.addValue(Short.MIN_VALUE);
+ rowBuilder.addValue(Integer.MIN_VALUE);
+ rowBuilder.addValue(Long.MIN_VALUE);
+ rowBuilder.addValue(-Float.MAX_VALUE);
+ rowBuilder.addValue(-Double.MAX_VALUE);
+ rowBuilder.addValue(new BigDecimal("-10000000000000.1000000000000000000000"));
+ Row row = rowBuilder.build();
+
+ SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+ List<String> res = mapper.mapRow(row);
+
+ assertEquals(7, res.size());
+ assertEquals("-128", res.get(0));
+ assertEquals("-32768", res.get(1));
+ assertEquals("-2147483648", res.get(2));
+ assertEquals("-9223372036854775808", res.get(3));
+ assertEquals("-3.4028235E38", res.get(4));
+ assertEquals("-1.7976931348623157E308", res.get(5));
+ assertEquals("-10000000000000.1000000000000000000000", res.get(6));
+ }
+
+ @Test
+ public void testEmptyRow() {
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ Schema schema = schemaBuilder.build();
+
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ Row row = rowBuilder.build();
+
+ SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+ List<String> res = mapper.mapRow(row);
+
+ assertEquals(0, res.size());
+ }
+
+ @Test
+ public void testAllDataTypes() {
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ schemaBuilder.addField("byte", Schema.FieldType.BYTE);
+ schemaBuilder.addField("int16", Schema.FieldType.INT16);
+ schemaBuilder.addField("int32", Schema.FieldType.INT32);
+ schemaBuilder.addField("int64", Schema.FieldType.INT64);
+ schemaBuilder.addField("float", Schema.FieldType.FLOAT);
+ schemaBuilder.addField("double", Schema.FieldType.DOUBLE);
+ schemaBuilder.addField("decimal", Schema.FieldType.DECIMAL);
+ schemaBuilder.addField("boolean", Schema.FieldType.BOOLEAN);
+ schemaBuilder.addField("datetime", Schema.FieldType.DATETIME);
+ schemaBuilder.addField("bytes", Schema.FieldType.BYTES);
+ schemaBuilder.addField("string", Schema.FieldType.STRING);
+ Schema schema = schemaBuilder.build();
+
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ rowBuilder.addValue((byte) 10);
+ rowBuilder.addValue((short) 10);
+ rowBuilder.addValue(10);
+ rowBuilder.addValue((long) 10);
+ rowBuilder.addValue((float) 10.1);
+ rowBuilder.addValue(10.1);
+ rowBuilder.addValue(new BigDecimal("10.1"));
+ rowBuilder.addValue(false);
+ rowBuilder.addValue(new DateTime("2022-01-01T10:10:10.012Z"));
+ rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+ rowBuilder.addValue("asd");
+ Row row = rowBuilder.build();
+
+ SingleStoreDefaultUserDataMapper mapper = new SingleStoreDefaultUserDataMapper();
+ List<String> res = mapper.mapRow(row);
+
+ assertEquals(11, res.size());
+ assertEquals("10", res.get(0));
+ assertEquals("10", res.get(1));
+ assertEquals("10", res.get(2));
+ assertEquals("10", res.get(3));
+ assertEquals("10.1", res.get(4));
+ assertEquals("10.1", res.get(5));
+ assertEquals("10.1", res.get(6));
+ assertEquals("0", res.get(7));
+ assertEquals("2022-01-01 10:10:10.012", res.get(8));
+ assertEquals("asd", res.get(9));
+ assertEquals("asd", res.get(10));
+ }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java
new file mode 100644
index 00000000000..32fc9d1a11f
--- /dev/null
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SingleStoreIODefaultMapperIT {
+ private static final String DATABASE_NAME = "SingleStoreIOIT";
+
+ private static int numberOfRows;
+
+ private static String tableName;
+
+ private static String serverName;
+
+ private static String username;
+
+ private static String password;
+
+ private static Integer port;
+
+ private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
+
+ private static Schema schema;
+
+ @BeforeClass
+ public static void setup() {
+ SingleStoreIOTestPipelineOptions options;
+ try {
+ options = readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
+ } catch (IllegalArgumentException e) {
+ options = null;
+ }
+ org.junit.Assume.assumeNotNull(options);
+
+ numberOfRows = options.getNumberOfRecords();
+ serverName = options.getSingleStoreServerName();
+ username = options.getSingleStoreUsername();
+ password = options.getSingleStorePassword();
+ port = options.getSingleStorePort();
+ tableName = DatabaseTestHelper.getTestTableName("IT");
+ dataSourceConfiguration =
+ SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port)
+ .withDatabase(DATABASE_NAME)
+ .withPassword(password)
+ .withUsername(username);
+
+ generateSchema();
+ }
+
+ private static void generateSchema() {
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ schemaBuilder.addField("c1", Schema.FieldType.BOOLEAN);
+ schemaBuilder.addField("c2", Schema.FieldType.BYTE);
+ schemaBuilder.addField("c3", Schema.FieldType.INT16);
+ schemaBuilder.addField("c4", Schema.FieldType.INT32);
+ schemaBuilder.addField("c5", Schema.FieldType.INT64);
+ schemaBuilder.addField("c6", Schema.FieldType.FLOAT);
+ schemaBuilder.addField("c7", Schema.FieldType.DOUBLE);
+ schemaBuilder.addField("c8", Schema.FieldType.DECIMAL);
+ schemaBuilder.addField("c9", Schema.FieldType.DATETIME);
+ schemaBuilder.addField("c10", Schema.FieldType.DATETIME);
+ schemaBuilder.addField("c11", Schema.FieldType.DATETIME);
+ schemaBuilder.addField("c12", Schema.FieldType.BYTES);
+ schemaBuilder.addField("c13", Schema.FieldType.BYTES);
+ schemaBuilder.addField("c14", Schema.FieldType.BYTES);
+ schemaBuilder.addField("c15", Schema.FieldType.STRING);
+ schemaBuilder.addField("c16", Schema.FieldType.STRING);
+ schemaBuilder.addField("c17", Schema.FieldType.STRING);
+
+ schema = schemaBuilder.build();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWriteThenRead() throws Exception {
+ TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
+ createTable();
+
+ try {
+ PipelineResult writeResult = runWrite();
+ assertEquals(PipelineResult.State.DONE, writeResult.waitUntilFinish());
+ PipelineResult readResult = runRead();
+ assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish());
+ } finally {
+ DataSource dataSource = dataSourceConfiguration.getDataSource();
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+ }
+
+ private void createTable() throws SQLException {
+ DataSource dataSource = dataSourceConfiguration.getDataSource();
+ Connection conn = dataSource.getConnection();
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate("DROP TABLE IF EXISTS " + tableName);
+ stmt.executeUpdate(
+ "CREATE TABLE "
+ + tableName
+ + "("
+ + "c1 BIT, "
+ + "c2 TINYINT, "
+ + "c3 SMALLINT, "
+ + "c4 INTEGER, "
+ + "c5 BIGINT, "
+ + "c6 FLOAT, "
+ + "c7 DOUBLE, "
+ + "c8 DECIMAL(10, 5), "
+ + "c9 TIMESTAMP, "
+ + "c10 DATE, "
+ + "c11 TIME, "
+ + "c12 BLOB, "
+ + "c13 TINYBLOB, "
+ + "c14 BINARY(10), "
+ + "c15 MEDIUMTEXT, "
+ + "c16 TINYTEXT, "
+ + "c17 CHAR(10) "
+ + ")");
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Rule public TestPipeline pipelineWrite = TestPipeline.create();
+ @Rule public TestPipeline pipelineRead = TestPipeline.create();
+
+ public static class ConstructRowFn extends DoFn<Long, Row> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ rowBuilder.addValue(Boolean.TRUE);
+ rowBuilder.addValue((byte) 10);
+ rowBuilder.addValue((short) 10);
+ rowBuilder.addValue(10);
+ rowBuilder.addValue((long) 10);
+ rowBuilder.addValue((float) 10.1);
+ rowBuilder.addValue(10.1);
+ rowBuilder.addValue(new BigDecimal("10.1"));
+ rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+ rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+ rowBuilder.addValue(new DateTime("2022-01-01T10:10:10Z"));
+ rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+ rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+ rowBuilder.addValue("asd".getBytes(StandardCharsets.UTF_8));
+ rowBuilder.addValue("asd");
+ rowBuilder.addValue("asd");
+ rowBuilder.addValue("asd");
+ Row row = rowBuilder.build();
+
+ c.output(row);
+ }
+ }
+
+ private PipelineResult runWrite() {
+ pipelineWrite
+ .apply(GenerateSequence.from(0).to(numberOfRows))
+ .apply(ParDo.of(new ConstructRowFn()))
+ .setRowSchema(schema)
+ .apply(
+ SingleStoreIO.writeRows()
+ .withDataSourceConfiguration(dataSourceConfiguration)
+ .withTable(tableName));
+
+ return pipelineWrite.run();
+ }
+
+ private PipelineResult runRead() {
+ PCollection<Row> res =
+ pipelineRead.apply(
+ SingleStoreIO.readRows()
+ .withDataSourceConfiguration(dataSourceConfiguration)
+ .withTable(tableName));
+
+ PAssert.thatSingleton(res.apply("Count All", Count.globally())).isEqualTo((long) numberOfRows);
+
+ res.apply(ParDo.of(new CheckDoFn())).setCoder(VoidCoder.of());
+
+ return pipelineRead.run();
+ }
+
+ private static class CheckDoFn extends DoFn<Row, Void> {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ Row r = c.element();
+ Assert.assertNotNull(r);
+ assertEquals(Boolean.TRUE, r.getBoolean(0));
+ assertEquals((Byte) (byte) 10, r.getByte(1));
+ assertEquals((Short) (short) 10, r.getInt16(2));
+ assertEquals((Integer) 10, r.getInt32(3));
+ assertEquals((Long) (long) 10, r.getInt64(4));
+ assertEquals((Float) (float) 10.1, r.getFloat(5));
+ assertEquals((Double) 10.1, r.getDouble(6));
+ assertEquals(new BigDecimal("10.10000"), r.getDecimal(7));
+ assertEquals(0, new DateTime("2022-01-01T10:10:10Z").compareTo(r.getDateTime(8)));
+ assertEquals(0, new DateTime("2022-01-01T00:00:00Z").compareTo(r.getDateTime(9)));
+ assertEquals(0, new DateTime("1970-01-01T10:10:10Z").compareTo(r.getDateTime(10)));
+ assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(11));
+ assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), r.getBytes(12));
+ assertArrayEquals("asd\0\0\0\0\0\0\0".getBytes(StandardCharsets.UTF_8), r.getBytes(13));
+ assertEquals("asd", r.getString(14));
+ assertEquals("asd", r.getString(15));
+ assertEquals("asd", r.getString(16));
+ }
+ }
+}
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
similarity index 92%
copy from sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
copy to sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
index cdbcacd0443..8adf8a47dc1 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOPerformanceIT.java
@@ -22,9 +22,6 @@ import static org.junit.Assert.assertEquals;
import com.google.cloud.Timestamp;
import com.singlestore.jdbc.SingleStoreDataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -58,9 +55,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-public class SingleStoreIOIT {
+public class SingleStoreIOPerformanceIT {
- private static final String NAMESPACE = SingleStoreIOIT.class.getName();
+ private static final String NAMESPACE = SingleStoreIOPerformanceIT.class.getName();
private static final String DATABASE_NAME = "SingleStoreIOIT";
@@ -109,22 +106,10 @@ public class SingleStoreIOIT {
.get();
}
- void createDatabaseIfNotExists() throws SQLException {
- DataSource dataSource =
- new SingleStoreDataSource(
- String.format(
- "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
- serverName, port, username, password));
- try (Connection conn = dataSource.getConnection();
- Statement stmt = conn.createStatement()) {
- stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_NAME));
- }
- }
-
@Test
@Category(NeedsRunner.class)
public void testWriteThenRead() throws Exception {
- createDatabaseIfNotExists();
+ TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
DataSource dataSource =
new SingleStoreDataSource(
String.format(
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
similarity index 53%
rename from sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
rename to sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
index cdbcacd0443..4ded4cd452a 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOIT.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java
@@ -19,37 +19,38 @@ package org.apache.beam.sdk.io.singlestore;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-import com.google.cloud.Timestamp;
import com.singlestore.jdbc.SingleStoreDataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
import javax.sql.DataSource;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadConfiguration;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadProvider;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteConfiguration;
+import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testutils.NamedTestResult;
-import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
-import org.apache.beam.sdk.testutils.metrics.MetricsReader;
-import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
-import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -58,9 +59,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-public class SingleStoreIOIT {
-
- private static final String NAMESPACE = SingleStoreIOIT.class.getName();
+public class SingleStoreIOSchemaTransformIT {
private static final String DATABASE_NAME = "SingleStoreIOIT";
@@ -78,8 +77,6 @@ public class SingleStoreIOIT {
private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
- private static InfluxDBSettings settings;
-
@BeforeClass
public static void setup() {
SingleStoreIOTestPipelineOptions options;
@@ -101,30 +98,12 @@ public class SingleStoreIOIT {
.withDatabase(DATABASE_NAME)
.withPassword(password)
.withUsername(username);
- settings =
- InfluxDBSettings.builder()
- .withHost(options.getInfluxHost())
- .withDatabase(options.getInfluxDatabase())
- .withMeasurement(options.getInfluxMeasurement())
- .get();
- }
-
- void createDatabaseIfNotExists() throws SQLException {
- DataSource dataSource =
- new SingleStoreDataSource(
- String.format(
- "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
- serverName, port, username, password));
- try (Connection conn = dataSource.getConnection();
- Statement stmt = conn.createStatement()) {
- stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_NAME));
- }
}
@Test
@Category(NeedsRunner.class)
public void testWriteThenRead() throws Exception {
- createDatabaseIfNotExists();
+ TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
DataSource dataSource =
new SingleStoreDataSource(
String.format(
@@ -138,76 +117,64 @@ public class SingleStoreIOIT {
assertEquals(PipelineResult.State.DONE, readResult.waitUntilFinish());
PipelineResult readResultWithPartitions = runReadWithPartitions();
assertEquals(PipelineResult.State.DONE, readResultWithPartitions.waitUntilFinish());
- gatherAndPublishMetrics(writeResult, readResult, readResultWithPartitions);
} finally {
DatabaseTestHelper.deleteTable(dataSource, tableName);
}
}
- private void gatherAndPublishMetrics(
- PipelineResult writeResult,
- PipelineResult readResult,
- PipelineResult readResultWithPartitions) {
- String uuid = UUID.randomUUID().toString();
- String timestamp = Timestamp.now().toString();
-
- IOITMetrics writeMetrics =
- new IOITMetrics(
- getMetricSuppliers(uuid, timestamp, "write_time"),
- writeResult,
- NAMESPACE,
- uuid,
- timestamp);
- writeMetrics.publishToInflux(settings);
-
- IOITMetrics readMetrics =
- new IOITMetrics(
- getMetricSuppliers(uuid, timestamp, "read_time"),
- readResult,
- NAMESPACE,
- uuid,
- timestamp);
- readMetrics.publishToInflux(settings);
-
- IOITMetrics readMetricsWithPartitions =
- new IOITMetrics(
- getMetricSuppliers(uuid, timestamp, "read_with_partitions_time"),
- readResultWithPartitions,
- NAMESPACE,
- uuid,
- timestamp);
- readMetricsWithPartitions.publishToInflux(settings);
- }
-
- private Set<Function<MetricsReader, NamedTestResult>> getMetricSuppliers(
- String uuid, String timestamp, String metricName) {
- Set<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<>();
-
- suppliers.add(
- reader -> {
- long writeStart = reader.getStartTimeMetric(metricName);
- long writeEnd = reader.getEndTimeMetric(metricName);
- return NamedTestResult.create(uuid, timestamp, metricName, (writeEnd - writeStart) / 1e3);
- });
-
- return suppliers;
- }
-
@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();
@Rule public TestPipeline pipelineReadWithPartitions = TestPipeline.create();
private PipelineResult runWrite() {
- PCollection<Integer> writtenRows =
+ SchemaTransformProvider provider = new SingleStoreSchemaTransformWriteProvider();
+
+ SingleStoreSchemaTransformWriteConfiguration configuration =
+ SingleStoreSchemaTransformWriteConfiguration.builder()
+ .setDataSourceConfiguration(dataSourceConfiguration)
+ .setTable(tableName)
+ .setBatchSize(100)
+ .build();
+
+ Row configurationRow = configuration.toBeamRow();
+ SchemaTransform schemaTransform = provider.from(configurationRow);
+ PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+ schemaTransform.buildTransform();
+
+ Schema.Builder schemaBuilder = new Schema.Builder();
+ schemaBuilder.addField("id", Schema.FieldType.INT32);
+ schemaBuilder.addField("name", Schema.FieldType.STRING);
+ Schema schema = schemaBuilder.build();
+
+ PCollection<Row> rows =
pipelineWrite
.apply(GenerateSequence.from(0).to(numberOfRows))
.apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
- .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "write_time")))
.apply(
- SingleStoreIO.<TestRow>write()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable(tableName)
- .withUserDataMapper(new TestHelper.TestUserDataMapper()));
+ "Convert TestRows to Rows",
+ MapElements.into(TypeDescriptor.of(Row.class))
+ .via(
+ (SerializableFunction<TestRow, Row>)
+ testRow -> {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ rowBuilder.addValue(testRow.id());
+ rowBuilder.addValue(testRow.name());
+ return rowBuilder.build();
+ }))
+ .setRowSchema(schema);
+
+ PCollectionRowTuple input =
+ PCollectionRowTuple.of(SingleStoreSchemaTransformWriteProvider.INPUT_TAG, rows);
+ String tag = provider.outputCollectionNames().get(0);
+ PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+ assertTrue(output.has(tag));
+ PCollection<Integer> writtenRows =
+ output
+ .get(tag)
+ .apply(
+ "Convert Rows to Integers",
+ MapElements.into(TypeDescriptor.of(Integer.class))
+ .via((SerializableFunction<Row, Integer>) row -> row.getInt32(0)));
PAssert.thatSingleton(writtenRows.apply("Sum All", Sum.integersGlobally()))
.isEqualTo(numberOfRows);
@@ -216,14 +183,32 @@ public class SingleStoreIOIT {
}
private PipelineResult runRead() {
+ SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider();
+
+ SingleStoreSchemaTransformReadConfiguration configuration =
+ SingleStoreSchemaTransformReadConfiguration.builder()
+ .setDataSourceConfiguration(dataSourceConfiguration)
+ .setTable(tableName)
+ .setOutputParallelization(true)
+ .build();
+
+ Row configurationRow = configuration.toBeamRow();
+ SchemaTransform schemaTransform = provider.from(configurationRow);
+ PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+ schemaTransform.buildTransform();
+
+ PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead);
+ String tag = provider.outputCollectionNames().get(0);
+ PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+ assertTrue(output.has(tag));
PCollection<TestRow> namesAndIds =
- pipelineRead
+ output
+ .get(tag)
.apply(
- SingleStoreIO.<TestRow>read()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable(tableName)
- .withRowMapper(new TestHelper.TestRowMapper()))
- .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")));
+ MapElements.into(TypeDescriptor.of(TestRow.class))
+ .via(
+ (SerializableFunction<Row, TestRow>)
+ row -> TestRow.create(row.getInt32(0), row.getString(1))));
testReadResult(namesAndIds);
@@ -231,14 +216,32 @@ public class SingleStoreIOIT {
}
private PipelineResult runReadWithPartitions() {
+ SchemaTransformProvider provider = new SingleStoreSchemaTransformReadProvider();
+
+ SingleStoreSchemaTransformReadConfiguration configuration =
+ SingleStoreSchemaTransformReadConfiguration.builder()
+ .setDataSourceConfiguration(dataSourceConfiguration)
+ .setTable(tableName)
+ .setWithPartitions(true)
+ .build();
+
+ Row configurationRow = configuration.toBeamRow();
+ SchemaTransform schemaTransform = provider.from(configurationRow);
+ PTransform<PCollectionRowTuple, PCollectionRowTuple> pCollectionRowTupleTransform =
+ schemaTransform.buildTransform();
+
+ PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions);
+ String tag = provider.outputCollectionNames().get(0);
+ PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+ assertTrue(output.has(tag));
PCollection<TestRow> namesAndIds =
- pipelineReadWithPartitions
+ output
+ .get(tag)
.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable(tableName)
- .withRowMapper(new TestHelper.TestRowMapper()))
- .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_with_partitions_time")));
+ MapElements.into(TypeDescriptor.of(TestRow.class))
+ .via(
+ (SerializableFunction<Row, TestRow>)
+ row -> TestRow.create(row.getInt32(0), row.getString(1))));
testReadResult(namesAndIds);
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
index 4b743e43cd1..6f50d419368 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreUtilTest.java
@@ -79,6 +79,23 @@ public class SingleStoreUtilTest {
}
}
+ private static class TestRowMapperWithCoder extends TestRowMapper
+ implements SingleStoreIO.RowMapperWithCoder<TestRow> {
+ @Override
+ public Coder<TestRow> getCoder() throws Exception {
+ return SerializableCoder.of(TestRow.class);
+ }
+ }
+
+ @Test
+ public void testInferCoderFromRowMapper() {
+ SchemaRegistry sr = SchemaRegistry.createDefault();
+ CoderRegistry cr = CoderRegistry.createDefault();
+ Coder<TestRow> c = SerializableCoder.of(TestRow.class);
+
+ assertEquals(c, SingleStoreUtil.inferCoder(new TestRowMapperWithCoder(), cr, sr, LOG));
+ }
+
@Test
public void testInferCoderFromSchemaRegistry() {
SchemaRegistry sr = SchemaRegistry.createDefault();
diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
index 7c0acc53909..e04785db4ee 100644
--- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
+++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/TestHelper.java
@@ -17,9 +17,14 @@
*/
package org.apache.beam.sdk.io.singlestore;
+import com.singlestore.jdbc.SingleStoreDataSource;
+import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.TestRow;
public class TestHelper {
@@ -67,4 +72,18 @@ public class TestHelper {
return "secretPass";
}
}
+
+ public static void createDatabaseIfNotExists(
+ String serverName, Integer port, String username, String password, String databaseName)
+ throws SQLException {
+ DataSource dataSource =
+ new SingleStoreDataSource(
+ String.format(
+ "jdbc:singlestore://%s:%d/?user=%s&password=%s&allowLocalInfile=TRUE",
+ serverName, port, username, password));
+ try (Connection conn = dataSource.getConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.executeQuery(String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName));
+ }
+ }
}