You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/08 02:37:35 UTC

[GitHub] [beam] zhangt-nhlab opened a new pull request, #23075: add a new IO named DataLakeIO (#23074)

zhangt-nhlab opened a new pull request, #23075:
URL: https://github.com/apache/beam/pull/23075

   We developed  a new IO named DataLakeIO, which support beam to read data from data lake (delta, iceberg, hudi), and write data to data lake(delta, icberg, hudi).  
   
   Because delta , iceberg and hudi does not provide enough java api to read and write, so we use spark datasouce api to read and write data in DataLakeIO. Therefore, the spark dependencies is needed.   
   
   BeamDeltaTest, BeamIcebergTest and BeamHudiTest show how to use the above features. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r978335634


##########
sdks/java/io/datalake/build.gradle:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.datalake')
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Datalake"
+ext.summary = "IO to read from and write to Data Lake"
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+//    implementation library.java.slf4j_api
+    implementation library.java.vendored_guava_26_0_jre
+    implementation "org.apache.spark:spark-sql_2.12:3.1.2"
+    implementation "org.apache.spark:spark-core_2.12:3.1.2"
+    implementation "org.apache.spark:spark-streaming_2.12:3.1.2"

Review Comment:
   This IO would be really neat and I understand the motivation of using Spark underneath.
   Nevertheless, the spark dependency is rather problematic and I'm very concerned about the consequences ... 
   
   There's also a Spark runner, which supports both Spark 2.4 and Spark >= 3.1. This IO would certainly conflict with the Spark 2.4 runner. The Spark 3 runner is build in a way that it supports various versions of Spark 3 (the path from 3.1 to 3.3 is full of breaking changes), Spark dependencies are typically provided (as available on the cluster). Even further, Spark comes with a massive tail of dependencies prone to causing conflicts with versions used in Beam.
   
   The one common candidate to mention here is Avro. Spark 3.1 is still using Avro 1.8 matching Beam's version, Spark 3.2 bumps Avro to 1.10 which is incompatible with Beam :/ This kinda exemplifies the maintenance headache ahead.
   
   Have you evaluated any alternative to using Spark underneath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r978335763


##########
sdks/java/io/datalake/build.gradle:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.datalake')
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Datalake"
+ext.summary = "IO to read from and write to Data Lake"
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+//    implementation library.java.slf4j_api
+    implementation library.java.vendored_guava_26_0_jre
+    implementation "org.apache.spark:spark-sql_2.12:3.1.2"
+    implementation "org.apache.spark:spark-core_2.12:3.1.2"
+    implementation "org.apache.spark:spark-streaming_2.12:3.1.2"

Review Comment:
   cc @aromanenko-dev 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1249290335

   Reminder, please take a look at this pr: @kileys @Abacn @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab closed pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab closed pull request #23075: add a new IO named DataLakeIO (#23074)
URL: https://github.com/apache/beam/pull/23075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1240158124

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kileys for label java.
   R: @Abacn for label build.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1253621206

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @damccorm for label build.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r979492981


##########
sdks/java/io/datalake/src/main/java/org/apache/beam/sdk/io/datalake/DataLakeIO.java:
##########
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datalake;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.*;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.*;
+
+public class DataLakeIO {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataLakeIO.class);
+
+    /**
+     * Read data from a data lake datasource.
+     *
+     * @param <T> Type of the data to be read.
+     */
+    public static <T> DataLakeIO.Read<T> read() {
+        return new AutoValue_DataLakeIO_Read.Builder<T>()
+                .setFetchSize(DEFAULT_FETCH_SIZE)
+                .build();
+    }
+
+    /**
+     * Like {@link #read}, but executes multiple instances of the query substituting each element of a
+     * {@link PCollection} as query parameters.
+     *
+     * @param <ParameterT> Type of the data representing query parameters.
+     * @param <OutputT> Type of the data to be read.
+     */
+    public static <ParameterT, OutputT> DataLakeIO.ReadAll<ParameterT, OutputT> readAll() {
+        return new AutoValue_DataLakeIO_ReadAll.Builder<ParameterT, OutputT>()
+                .setFetchSize(DEFAULT_FETCH_SIZE)
+                .build();
+    }
+
+
+    /** Write data to data lake. */
+    public static Write write() {
+        return new AutoValue_DataLakeIO_Write.Builder()
+                .build();
+    }
+
+    private static final int DEFAULT_FETCH_SIZE = 50_000;
+
+    private DataLakeIO() {}
+
+    /**
+     * An interface used by DataLakeIO. Read for converting each row of the RowRecord into
+     * an element of the resulting {@link PCollection}.
+     */
+    @FunctionalInterface
+    public interface RowMapper<T> extends Serializable {
+        T mapRow(Row rowRecord) throws Exception;
+    }
+
+    /** Implementation of {@link #read}. */
+    @AutoValue
+    public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+        abstract @Nullable ValueProvider<String> getQuery();
+
+        abstract @Nullable String getPath();
+
+        abstract @Nullable String getFormat();
+
+        abstract @Nullable SparkConf getSparkConf();
+
+        abstract @Nullable RowMapper<T> getRowMapper();
+
+        abstract @Nullable Coder<T> getCoder();
+
+        abstract int getFetchSize();
+
+        abstract DataLakeIO.Read.Builder<T> toBuilder();
+
+        @AutoValue.Builder
+        abstract static class Builder<T> {
+
+            abstract DataLakeIO.Read.Builder<T> setQuery(ValueProvider<String> query);
+
+            abstract DataLakeIO.Read.Builder<T> setPath(String path);
+
+            abstract DataLakeIO.Read.Builder<T> setFormat(String format);
+
+            abstract DataLakeIO.Read.Builder<T> setSparkConf(SparkConf sparkConf);
+
+            abstract DataLakeIO.Read.Builder<T> setRowMapper(DataLakeIO.RowMapper<T> rowMapper);
+
+            abstract DataLakeIO.Read.Builder<T> setCoder(Coder<T> coder);
+
+            abstract DataLakeIO.Read.Builder<T> setFetchSize(int fetchSize);
+
+            abstract DataLakeIO.Read<T> build();
+        }
+
+        public DataLakeIO.Read<T> withQuery(String query) {
+            return withQuery(ValueProvider.StaticValueProvider.of(query));
+        }
+
+        public DataLakeIO.Read<T> withQuery(ValueProvider<String> query) {
+            return toBuilder().setQuery(query).build();
+        }
+
+        public DataLakeIO.Read<T> withPath(String path){
+            return toBuilder().setPath(path).build();
+        }
+
+        public DataLakeIO.Read<T> withFormat(String format){
+            checkArgument(format != null, "format can not be null");
+            return toBuilder().setFormat(format).build();
+        }
+
+        public DataLakeIO.Read<T> withSparkConf(SparkConf sparkConf){
+            checkArgument(sparkConf != null, "sparkConf can not be null");
+            return toBuilder().setSparkConf(sparkConf).build();
+        }
+
+        public DataLakeIO.Read<T> withRowMapper(DataLakeIO.RowMapper<T> rowMapper) {
+            checkArgument(rowMapper != null, "rowMapper can not be null");
+            return toBuilder().setRowMapper(rowMapper).build();
+        }
+
+        public DataLakeIO.Read<T> withCoder(Coder<T> coder) {
+            checkArgument(coder != null, "coder can not be null");
+            return toBuilder().setCoder(coder).build();
+        }
+
+        /**
+         * This method is used to set the size of the data that is going to be fetched and loaded in
+         * memory per every database call. Please refer to: {@link java.sql.Statement#setFetchSize(int)}
+         * It should ONLY be used if the default value throws memory errors.
+         */
+        public DataLakeIO.Read<T> withFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "fetch size must be > 0");
+            return toBuilder().setFetchSize(fetchSize).build();
+        }
+
+        @Override
+        public PCollection<T> expand(PBegin input) {
+            checkArgument(getRowMapper() != null, "withRowMapper() is required");
+
+            DataLakeIO.ReadAll<Void, T> readAll =
+                    DataLakeIO.<Void, T>readAll()
+                            .withQuery(getQuery())
+                            .withPath(getPath())
+                            .withFormat(getFormat())
+                            .withSparkConf(getSparkConf())
+                            .withRowMapper(getRowMapper())
+                            .withFetchSize(getFetchSize())
+                    ;
+
+            if (getCoder() != null) {
+                readAll = readAll.toBuilder().setCoder(getCoder()).build();
+            }
+            return input.apply(Create.of((Void) null)).apply(readAll);
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            if(getQuery() != null){
+                builder.add(DisplayData.item("query", getQuery()));
+            }
+            builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+            if (getCoder() != null) {
+                builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+            }
+        }
+    }
+
+    /** Implementation of {@link #readAll}. */
+    @AutoValue
+    public abstract static class ReadAll<ParameterT, OutputT>
+            extends PTransform<PCollection<ParameterT>, PCollection<OutputT>> {
+
+        abstract @Nullable ValueProvider<String> getQuery();
+
+        abstract @Nullable String getPath();
+
+        abstract @Nullable String getFormat();
+
+        abstract @Nullable SparkConf getSparkConf();
+
+        abstract @Nullable RowMapper<OutputT> getRowMapper();
+
+        abstract @Nullable Coder<OutputT> getCoder();
+
+        abstract int getFetchSize();
+
+        abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> toBuilder();
+
+        @AutoValue.Builder
+        abstract static class Builder<ParameterT, OutputT> {
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setQuery(ValueProvider<String> query);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setPath(String path);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setFormat(String format);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setSparkConf(SparkConf sparkConf);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setRowMapper(DataLakeIO.RowMapper<OutputT> rowMapper);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setFetchSize(int fetchSize);
+
+            abstract DataLakeIO.ReadAll<ParameterT, OutputT> build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withQuery(String query) {
+            return withQuery(ValueProvider.StaticValueProvider.of(query));
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withQuery(ValueProvider<String> query) {
+            return toBuilder().setQuery(query).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withPath(String path){
+            return toBuilder().setPath(path).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withFormat(String format){
+            return toBuilder().setFormat(format).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withSparkConf(SparkConf sparkConf){
+            return toBuilder().setSparkConf(sparkConf).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withRowMapper(DataLakeIO.RowMapper<OutputT> rowMapper) {
+            checkArgument(
+                    rowMapper != null,
+                    "DataLakeIO.readAll().withRowMapper(rowMapper) called with null rowMapper");
+            return toBuilder().setRowMapper(rowMapper).build();
+        }
+
+        /**
+         * @deprecated
+         *     <p>{@link DataLakeIO} is able to infer aprppriate coders from other parameters.
+         */
+        @Deprecated
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withCoder(Coder<OutputT> coder) {
+            checkArgument(coder != null, "DataLakeIO.readAll().withCoder(coder) called with null coder");
+            return toBuilder().setCoder(coder).build();
+        }
+
+        /**
+         * This method is used to set the size of the data that is going to be fetched and loaded in
+         * memory per every database call. Please refer to: {@link java.sql.Statement#setFetchSize(int)}
+         * It should ONLY be used if the default value throws memory errors.
+         */
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "fetch size must be >0");
+            return toBuilder().setFetchSize(fetchSize).build();
+        }
+
+        private Coder<OutputT> inferCoder(CoderRegistry registry, SchemaRegistry schemaRegistry) {
+            if (getCoder() != null) {
+                return getCoder();
+            } else {
+                DataLakeIO.RowMapper<OutputT> rowMapper = getRowMapper();
+                TypeDescriptor<OutputT> outputType =
+                        TypeDescriptors.extractFromTypeParameters(
+                                rowMapper,
+                                DataLakeIO.RowMapper.class,
+                                new TypeVariableExtractor<DataLakeIO.RowMapper<OutputT>, OutputT>() {});
+                try {
+                    return schemaRegistry.getSchemaCoder(outputType);
+                } catch (NoSuchSchemaException e) {
+                    LOG.warn(
+                            "Unable to infer a schema for type {}. Attempting to infer a coder without a schema.",
+                            outputType);
+                }
+                try {
+                    return registry.getCoder(outputType);
+                } catch (CannotProvideCoderException e) {
+                    LOG.warn("Unable to infer a coder for type {}", outputType);
+                    return null;
+                }
+            }
+        }
+
+        @Override
+        public PCollection<OutputT> expand(PCollection<ParameterT> input) {
+            Coder<OutputT> coder =
+                    inferCoder(
+                            input.getPipeline().getCoderRegistry(), input.getPipeline().getSchemaRegistry());
+            checkNotNull(
+                    coder,
+                    "Unable to infer a coder for DataLakeIO.readAll() transform. "
+                            + "Provide a coder via withCoder, or ensure that one can be inferred from the"
+                            + " provided RowMapper.");
+            PCollection<OutputT> output =
+                    input
+                            .apply(
+                                    ParDo.of(
+                                            new DataLakeIO.ReadFn<>(
+                                                    getPath(),
+                                                    getFormat(),
+                                                    getSparkConf(),
+                                                    getRowMapper()
+                                            )))
+                            .setCoder(coder);
+
+            try {
+                TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+                SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+                Schema schema = registry.getSchema(typeDesc);
+                output.setSchema(
+                        schema,
+                        typeDesc,
+                        registry.getToRowFunction(typeDesc),
+                        registry.getFromRowFunction(typeDesc));
+            } catch (NoSuchSchemaException e) {
+                // ignore
+            }
+
+            return output;
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            if(getQuery() != null){
+                builder.add(DisplayData.item("query", getQuery()));
+            }
+            builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+            if (getCoder() != null) {
+                builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+            }
+        }
+    }
+
+    private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {

Review Comment:
   Thanks, I'll learn SplittableDoFn, then re-implemente it  as an SDF instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r979492240


##########
sdks/java/io/datalake/build.gradle:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.datalake')
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Datalake"
+ext.summary = "IO to read from and write to Data Lake"
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+//    implementation library.java.slf4j_api
+    implementation library.java.vendored_guava_26_0_jre
+    implementation "org.apache.spark:spark-sql_2.12:3.1.2"
+    implementation "org.apache.spark:spark-core_2.12:3.1.2"
+    implementation "org.apache.spark:spark-streaming_2.12:3.1.2"

Review Comment:
   > This IO would be really neat and I understand the motivation of using Spark underneath. Nevertheless, the spark dependency is rather problematic and I'm very concerned about the consequences ...
   > 
   > There's also a Spark runner, which supports both Spark 2.4 and Spark >= 3.1. This IO would certainly conflict with the Spark 2.4 runner. The Spark 3 runner is build in a way that it supports various versions of Spark 3 (the path from 3.1 to 3.3 is full of breaking changes), Spark dependencies are typically provided (as available on the cluster). Even further, Spark comes with a massive tail of dependencies prone to causing conflicts with versions used in Beam.
   > 
   > The one common candidate to mention here is Avro. Spark 3.1 is still using Avro 1.8 matching Beam's version, Spark 3.2 bumps Avro to 1.10 which is incompatible with Beam :/ This kinda exemplifies the maintenance headache ahead.
   > 
   > Have you evaluated any alternative to using Spark underneath?
   
   Your advice is great !  I'll consider it, and then think about other alternatives



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1240269438

   @kileys 
   @Abacn
   @johnjcasey


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1249466890

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r979492520


##########
sdks/java/io/datalake/build.gradle:
##########
@@ -0,0 +1,54 @@
+/*

Review Comment:
   Okay, I'll modify it according to this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #23075:
URL: https://github.com/apache/beam/pull/23075#discussion_r965997033


##########
sdks/java/io/datalake/build.gradle:
##########
@@ -0,0 +1,54 @@
+/*

Review Comment:
   Can you re-work these dependencies to match the pattern used for other IOs? See io/google-cloud-platform/build.gradel for an example.
   
   New dependencies themselves are included buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy



##########
sdks/java/io/datalake/src/main/java/org/apache/beam/sdk/io/datalake/DataLakeIO.java:
##########
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datalake;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.*;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.*;
+
+public class DataLakeIO {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataLakeIO.class);
+
+    /**
+     * Read data from a data lake datasource.
+     *
+     * @param <T> Type of the data to be read.
+     */
+    public static <T> DataLakeIO.Read<T> read() {
+        return new AutoValue_DataLakeIO_Read.Builder<T>()
+                .setFetchSize(DEFAULT_FETCH_SIZE)
+                .build();
+    }
+
+    /**
+     * Like {@link #read}, but executes multiple instances of the query substituting each element of a
+     * {@link PCollection} as query parameters.
+     *
+     * @param <ParameterT> Type of the data representing query parameters.
+     * @param <OutputT> Type of the data to be read.
+     */
+    public static <ParameterT, OutputT> DataLakeIO.ReadAll<ParameterT, OutputT> readAll() {
+        return new AutoValue_DataLakeIO_ReadAll.Builder<ParameterT, OutputT>()
+                .setFetchSize(DEFAULT_FETCH_SIZE)
+                .build();
+    }
+
+
+    /** Write data to data lake. */
+    public static Write write() {
+        return new AutoValue_DataLakeIO_Write.Builder()
+                .build();
+    }
+
+    private static final int DEFAULT_FETCH_SIZE = 50_000;
+
+    private DataLakeIO() {}
+
+    /**
+     * An interface used by DataLakeIO. Read for converting each row of the RowRecord into
+     * an element of the resulting {@link PCollection}.
+     */
+    @FunctionalInterface
+    public interface RowMapper<T> extends Serializable {
+        T mapRow(Row rowRecord) throws Exception;
+    }
+
+    /** Implementation of {@link #read}. */
+    @AutoValue
+    public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+        abstract @Nullable ValueProvider<String> getQuery();
+
+        abstract @Nullable String getPath();
+
+        abstract @Nullable String getFormat();
+
+        abstract @Nullable SparkConf getSparkConf();
+
+        abstract @Nullable RowMapper<T> getRowMapper();
+
+        abstract @Nullable Coder<T> getCoder();
+
+        abstract int getFetchSize();
+
+        abstract DataLakeIO.Read.Builder<T> toBuilder();
+
+        @AutoValue.Builder
+        abstract static class Builder<T> {
+
+            abstract DataLakeIO.Read.Builder<T> setQuery(ValueProvider<String> query);
+
+            abstract DataLakeIO.Read.Builder<T> setPath(String path);
+
+            abstract DataLakeIO.Read.Builder<T> setFormat(String format);
+
+            abstract DataLakeIO.Read.Builder<T> setSparkConf(SparkConf sparkConf);
+
+            abstract DataLakeIO.Read.Builder<T> setRowMapper(DataLakeIO.RowMapper<T> rowMapper);
+
+            abstract DataLakeIO.Read.Builder<T> setCoder(Coder<T> coder);
+
+            abstract DataLakeIO.Read.Builder<T> setFetchSize(int fetchSize);
+
+            abstract DataLakeIO.Read<T> build();
+        }
+
+        public DataLakeIO.Read<T> withQuery(String query) {
+            return withQuery(ValueProvider.StaticValueProvider.of(query));
+        }
+
+        public DataLakeIO.Read<T> withQuery(ValueProvider<String> query) {
+            return toBuilder().setQuery(query).build();
+        }
+
+        public DataLakeIO.Read<T> withPath(String path){
+            return toBuilder().setPath(path).build();
+        }
+
+        public DataLakeIO.Read<T> withFormat(String format){
+            checkArgument(format != null, "format can not be null");
+            return toBuilder().setFormat(format).build();
+        }
+
+        public DataLakeIO.Read<T> withSparkConf(SparkConf sparkConf){
+            checkArgument(sparkConf != null, "sparkConf can not be null");
+            return toBuilder().setSparkConf(sparkConf).build();
+        }
+
+        public DataLakeIO.Read<T> withRowMapper(DataLakeIO.RowMapper<T> rowMapper) {
+            checkArgument(rowMapper != null, "rowMapper can not be null");
+            return toBuilder().setRowMapper(rowMapper).build();
+        }
+
+        public DataLakeIO.Read<T> withCoder(Coder<T> coder) {
+            checkArgument(coder != null, "coder can not be null");
+            return toBuilder().setCoder(coder).build();
+        }
+
+        /**
+         * This method is used to set the size of the data that is going to be fetched and loaded in
+         * memory per every database call. Please refer to: {@link java.sql.Statement#setFetchSize(int)}
+         * It should ONLY be used if the default value throws memory errors.
+         */
+        public DataLakeIO.Read<T> withFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "fetch size must be > 0");
+            return toBuilder().setFetchSize(fetchSize).build();
+        }
+
+        @Override
+        public PCollection<T> expand(PBegin input) {
+            checkArgument(getRowMapper() != null, "withRowMapper() is required");
+
+            DataLakeIO.ReadAll<Void, T> readAll =
+                    DataLakeIO.<Void, T>readAll()
+                            .withQuery(getQuery())
+                            .withPath(getPath())
+                            .withFormat(getFormat())
+                            .withSparkConf(getSparkConf())
+                            .withRowMapper(getRowMapper())
+                            .withFetchSize(getFetchSize())
+                    ;
+
+            if (getCoder() != null) {
+                readAll = readAll.toBuilder().setCoder(getCoder()).build();
+            }
+            return input.apply(Create.of((Void) null)).apply(readAll);
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            if(getQuery() != null){
+                builder.add(DisplayData.item("query", getQuery()));
+            }
+            builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+            if (getCoder() != null) {
+                builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+            }
+        }
+    }
+
+    /** Implementation of {@link #readAll}. */
+    @AutoValue
+    public abstract static class ReadAll<ParameterT, OutputT>
+            extends PTransform<PCollection<ParameterT>, PCollection<OutputT>> {
+
+        abstract @Nullable ValueProvider<String> getQuery();
+
+        abstract @Nullable String getPath();
+
+        abstract @Nullable String getFormat();
+
+        abstract @Nullable SparkConf getSparkConf();
+
+        abstract @Nullable RowMapper<OutputT> getRowMapper();
+
+        abstract @Nullable Coder<OutputT> getCoder();
+
+        abstract int getFetchSize();
+
+        abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> toBuilder();
+
+        @AutoValue.Builder
+        abstract static class Builder<ParameterT, OutputT> {
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setQuery(ValueProvider<String> query);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setPath(String path);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setFormat(String format);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setSparkConf(SparkConf sparkConf);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setRowMapper(DataLakeIO.RowMapper<OutputT> rowMapper);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+
+            abstract DataLakeIO.ReadAll.Builder<ParameterT, OutputT> setFetchSize(int fetchSize);
+
+            abstract DataLakeIO.ReadAll<ParameterT, OutputT> build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withQuery(String query) {
+            return withQuery(ValueProvider.StaticValueProvider.of(query));
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withQuery(ValueProvider<String> query) {
+            return toBuilder().setQuery(query).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withPath(String path){
+            return toBuilder().setPath(path).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withFormat(String format){
+            return toBuilder().setFormat(format).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withSparkConf(SparkConf sparkConf){
+            return toBuilder().setSparkConf(sparkConf).build();
+        }
+
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withRowMapper(DataLakeIO.RowMapper<OutputT> rowMapper) {
+            checkArgument(
+                    rowMapper != null,
+                    "DataLakeIO.readAll().withRowMapper(rowMapper) called with null rowMapper");
+            return toBuilder().setRowMapper(rowMapper).build();
+        }
+
+        /**
+         * @deprecated
+         *     <p>{@link DataLakeIO} is able to infer aprppriate coders from other parameters.
+         */
+        @Deprecated
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withCoder(Coder<OutputT> coder) {
+            checkArgument(coder != null, "DataLakeIO.readAll().withCoder(coder) called with null coder");
+            return toBuilder().setCoder(coder).build();
+        }
+
+        /**
+         * This method is used to set the size of the data that is going to be fetched and loaded in
+         * memory per every database call. Please refer to: {@link java.sql.Statement#setFetchSize(int)}
+         * It should ONLY be used if the default value throws memory errors.
+         */
+        public DataLakeIO.ReadAll<ParameterT, OutputT> withFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "fetch size must be >0");
+            return toBuilder().setFetchSize(fetchSize).build();
+        }
+
+        private Coder<OutputT> inferCoder(CoderRegistry registry, SchemaRegistry schemaRegistry) {
+            if (getCoder() != null) {
+                return getCoder();
+            } else {
+                DataLakeIO.RowMapper<OutputT> rowMapper = getRowMapper();
+                TypeDescriptor<OutputT> outputType =
+                        TypeDescriptors.extractFromTypeParameters(
+                                rowMapper,
+                                DataLakeIO.RowMapper.class,
+                                new TypeVariableExtractor<DataLakeIO.RowMapper<OutputT>, OutputT>() {});
+                try {
+                    return schemaRegistry.getSchemaCoder(outputType);
+                } catch (NoSuchSchemaException e) {
+                    LOG.warn(
+                            "Unable to infer a schema for type {}. Attempting to infer a coder without a schema.",
+                            outputType);
+                }
+                try {
+                    return registry.getCoder(outputType);
+                } catch (CannotProvideCoderException e) {
+                    LOG.warn("Unable to infer a coder for type {}", outputType);
+                    return null;
+                }
+            }
+        }
+
+        @Override
+        public PCollection<OutputT> expand(PCollection<ParameterT> input) {
+            Coder<OutputT> coder =
+                    inferCoder(
+                            input.getPipeline().getCoderRegistry(), input.getPipeline().getSchemaRegistry());
+            checkNotNull(
+                    coder,
+                    "Unable to infer a coder for DataLakeIO.readAll() transform. "
+                            + "Provide a coder via withCoder, or ensure that one can be inferred from the"
+                            + " provided RowMapper.");
+            PCollection<OutputT> output =
+                    input
+                            .apply(
+                                    ParDo.of(
+                                            new DataLakeIO.ReadFn<>(
+                                                    getPath(),
+                                                    getFormat(),
+                                                    getSparkConf(),
+                                                    getRowMapper()
+                                            )))
+                            .setCoder(coder);
+
+            try {
+                TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+                SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+                Schema schema = registry.getSchema(typeDesc);
+                output.setSchema(
+                        schema,
+                        typeDesc,
+                        registry.getToRowFunction(typeDesc),
+                        registry.getFromRowFunction(typeDesc));
+            } catch (NoSuchSchemaException e) {
+                // ignore
+            }
+
+            return output;
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+            super.populateDisplayData(builder);
+            if(getQuery() != null){
+                builder.add(DisplayData.item("query", getQuery()));
+            }
+            builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+            if (getCoder() != null) {
+                builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+            }
+        }
+    }
+
+    private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {

Review Comment:
   Currently, we are trying to have all sources implemented using the SplittableDoFn pattern to enable scalability, and are doing our best to not include new sources that are not implemented as SDFs. Can this be re-implemented as an SDF instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1240167048

   # [Codecov](https://codecov.io/gh/apache/beam/pull/23075?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#23075](https://codecov.io/gh/apache/beam/pull/23075?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d4f5fa4) into [master](https://codecov.io/gh/apache/beam/commit/f3cdbbf13a19803cf4d81ce67f77b8c48a172cc7?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f3cdbbf) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #23075      +/-   ##
   ==========================================
   - Coverage   73.58%   73.58%   -0.01%     
   ==========================================
     Files         716      716              
     Lines       95301    95301              
   ==========================================
   - Hits        70127    70126       -1     
   - Misses      23878    23879       +1     
     Partials     1296     1296              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.40% <ø> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/23075?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/23075/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.38% <0.00%> (-1.62%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/23075/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.05% <0.00%> (-0.39%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/23075/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.42% <0.00%> (ø)` | |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/23075/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.09% <0.00%> (+0.15%)` | :arrow_up: |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/23075/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `94.00% <0.00%> (+1.00%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1257335798

   > First of all - thanks for your contribution!
   > 
   > Before proceeding to review from my side, I'd like to know if there is a design doc or similar for this IO connector? It would be very helpful to understand the goals and the implementation of this connector in advance.
   > 
   > Also, several notes that are worth to mention:
   > 
   > 1. Please, create a new github issue for this feature.
   > 2. Please, avoid merging a `master` branch into your feature branch. Use `git rebase` instead.
   > 3. Run `./gradlew :sdks:java:io:datalake:check` locally before pushing your changes to origin.
   > 
   > You can find a Beam contribution guide here: https://beam.apache.org/contribute/get-started-contributing/
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zhangt-nhlab commented on pull request #23075: add a new IO named DataLakeIO (#23074)

Posted by GitBox <gi...@apache.org>.
zhangt-nhlab commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-1257337413

   > > First of all - thanks for your contribution!
   > > Before proceeding to review from my side, I'd like to know if there is a design doc or similar for this IO connector? It would be very helpful to understand the goals and the implementation of this connector in advance.
   > > Also, several notes that are worth to mention:
   > > 
   > > 1. Please, create a new github issue for this feature.
   > > 2. Please, avoid merging a `master` branch into your feature branch. Use `git rebase` instead.
   > > 3. Run `./gradlew :sdks:java:io:datalake:check` locally before pushing your changes to origin.
   > > 
   > > You can find a Beam contribution guide here: https://beam.apache.org/contribute/get-started-contributing/
   
   Thank you for your reply! I will make my changes, and create a new github issue later. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] add a new IO named DataLakeIO (#23074) [beam]

Posted by "aaltay (via GitHub)" <gi...@apache.org>.
aaltay commented on PR #23075:
URL: https://github.com/apache/beam/pull/23075#issuecomment-2078179706

   Was there any progress on getting this IO into Beam?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org