You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "RustedBones (via GitHub)" <gi...@apache.org> on 2023/03/01 12:12:26 UTC

[GitHub] [beam] RustedBones opened a new pull request, #25672: Support for read from Cosmos DB Core SQL API

RustedBones opened a new pull request, #25672:
URL: https://github.com/apache/beam/pull/25672

   Fix #23604
   Follow-up from #23610 
   
   Implementation inspired from official [azure-cosmos-spark](https://github.com/Azure/azure-sdk-for-java/blob/5178502fdf053afb59cc71482e4972470afae730/sdk/cosmos/azure-cosmos-spark_3_2-12/README.md)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1257596351


##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   Thanks, then can we change the comment that currently links to #20497 to sth like "// CosmosAsyncClient does not use nullable annotations"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1254141190


##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosOptions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import com.azure.cosmos.CosmosClientBuilder;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.azure.options.AzureOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   A bit painful that the checker does not see that `!Strings.isNullOrEmpty()` removes null values. Will use plain check and remove anotation



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosOptions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import com.azure.cosmos.CosmosClientBuilder;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.azure.options.AzureOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   A bit painful that the checker does not see that `!Strings.isNullOrEmpty()` removes null values. Will use plain check and remove annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1450097445

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25672](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1806e53) into [master](https://codecov.io/gh/apache/beam/commit/3bff0733df017297d448316a07c318d6284a403f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3bff073) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #25672   +/-   ##
   =======================================
     Coverage   72.82%   72.82%           
   =======================================
     Files         775      775           
     Lines      102928   102928           
   =======================================
   + Hits        74955    74958    +3     
   + Misses      26520    26517    -3     
     Partials     1453     1453           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.96% <ø> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.50% <0.00%> (-1.27%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.97% <0.00%> (-0.76%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.22% <0.00%> (-0.12%)` | :arrow_down: |
   | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `81.88% <0.00%> (+0.14%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.46% <0.00%> (+0.15%)` | :arrow_up: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `93.13% <0.00%> (+0.63%)` | :arrow_up: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (+0.74%)` | :arrow_up: |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/25672?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `76.66% <0.00%> (+1.33%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1253255757


##########
sdks/java/io/azure-cosmos/build.gradle:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmos")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB"
+
+dependencies {
+  implementation library.java.azure_cosmos
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.guava
+  implementation library.java.jackson_databind
+  // implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation project(path: ":sdks:java:io:azure")
+
+  testImplementation library.java.junit
+  testImplementation library.java.testcontainers_azure
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+//  testImplementation library.java.mockito_core

Review Comment:
   remove these commented out lines?



##########
CHANGES.md:
##########
@@ -60,6 +60,7 @@
 
 * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 * Support for Bigtable Change Streams added in Java `BigtableIO.ReadChangeStream` ([#27183](https://github.com/apache/beam/issues/27183))
+* Support for read from Cosmos DB Core SQL API [#23604](https://github.com/apache/beam/issues/23604)

Review Comment:
   this now will be in 2.50.0



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   (optional) If possible, do not suppress nullness warning in newly added code



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;

Review Comment:
   beam does not use wild card import: https://github.com/apache/beam/blob/619e41c7c0a64275dea7f02311e45a92637f43d8/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml#L82



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CosmosIO {
+
+  private CosmosIO() {}
+
+  private static final String DEFAULT_QUERY = "SELECT * FROM root";
+
+  /** Provide a {@link Read} {@link PTransform} to read data from a Cosmos DB. */
+  public static <T> Read<T> read(Class<T> classType) {
+    return new AutoValue_CosmosIO_Read.Builder<T>().setClassType(classType).build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  @SuppressWarnings({"rawtypes"})
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable Class<T> getClassType();
+
+    abstract @Nullable String getDatabase();
+
+    abstract @Nullable String getContainer();
+
+    abstract @Nullable String getQuery();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setClassType(Class<T> classType);
+
+      abstract Builder<T> setDatabase(String database);
+
+      abstract Builder<T> setContainer(String container);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Read<T> build();
+    }
+
+    /** Specify the Cosmos database to read from. */
+    public Read<T> withDatabase(String database) {
+      checkArgument(database != null, "database can not be null");
+      checkArgument(!database.isEmpty(), "database can not be empty");
+      return builder().setDatabase(database).build();
+    }
+
+    /** Specify the Cosmos container to read from. */
+    public Read<T> withContainer(String container) {
+      checkArgument(container != null, "container can not be null");
+      checkArgument(!container.isEmpty(), "container can not be empty");
+      return builder().setContainer(container).build();
+    }
+
+    /** Specify the query to read data. */
+    public Read<T> withQuery(String query) {
+      return builder().setQuery(query).build();
+    }
+
+    /** Specify the {@link Coder} used to serialize the document in the {@link PCollection}. */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkState(getDatabase() != null, "withDatabase() is required");
+      checkState(getContainer() != null, "withContainer() is required");
+      checkState(getCoder() != null, "withCoder() is required");
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedCosmosBDSource<>(this)));
+    }
+  }
+
+  /** A {@link BoundedSource} reading from Comos. */
+  @VisibleForTesting
+  public static class BoundedCosmosBDSource<T> extends BoundedSource<T> {
+
+    private final Read<T> spec;
+    private final NormalizedRange range;
+
+    private @Nullable Long estimatedByteSize;
+
+    BoundedCosmosBDSource(Read<T> spec) {
+      this(spec, NormalizedRange.FULL_RANGE, null);
+    }
+
+    BoundedCosmosBDSource(Read<T> spec, NormalizedRange range, Long estimatedSize) {
+      this.spec = spec;
+      this.range = range;
+      this.estimatedByteSize = estimatedSize;
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      CosmosClientBuilder builder = options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncDatabase database = client.getDatabase(spec.getDatabase());
+        CosmosAsyncContainer container = database.getContainer(spec.getContainer());
+        AsyncDocumentClient document = CosmosBridgeInternal.getAsyncDocumentClient(client);
+
+        List<BoundedCosmosBDSource<T>> sources = new ArrayList<>();
+        long rangeSize = getEstimatedSizeBytes(options);
+        float splitsFloat = (float) rangeSize / desiredBundleSizeBytes;
+        int splits = (int) Math.ceil(splitsFloat);
+
+        // match internal impl of CosmosAsyncContainer trySplitFeedRange
+        String databaseLink =
+            ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor()
+                .getLink(database);
+        String containerLink =
+            databaseLink + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/" + container.getId();
+        Mono<Utils.ValueHolder<DocumentCollection>> getCollectionObservable =
+            document
+                .getCollectionCache()
+                .resolveByNameAsync(null, containerLink, null)
+                .map(Utils.ValueHolder::initialize);
+
+        List<NormalizedRange> subRanges =
+            FeedRangeInternal.convert(range.toFeedRange())
+                .trySplit(
+                    document.getPartitionKeyRangeCache(), null, getCollectionObservable, splits)
+                .block().stream()
+                .map(NormalizedRange::fromFeedRange)
+                .collect(Collectors.toList());
+
+        long estimatedSubRangeSize = rangeSize / subRanges.size();
+        for (NormalizedRange subrange : subRanges) {
+          sources.add(new BoundedCosmosBDSource<>(spec, subrange, estimatedSubRangeSize));
+        }
+
+        return sources;
+      }
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      if (estimatedByteSize != null) {
+        return estimatedByteSize;
+      }
+      CosmosClientBuilder builder = options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncContainer container =
+            client.getDatabase(spec.getDatabase()).getContainer(spec.getContainer());
+
+        CosmosChangeFeedRequestOptions requestOptions =
+            CosmosChangeFeedRequestOptions.createForProcessingFromNow(range.toFeedRange());
+        requestOptions.setMaxItemCount(1);
+        requestOptions.setMaxPrefetchPageCount(1);
+        requestOptions.setQuotaInfoEnabled(true);
+
+        estimatedByteSize =
+            container
+                .queryChangeFeed(requestOptions, ObjectNode.class)
+                .byPage()
+                .take(1)
+                .map(FeedResponse::getDocumentUsage)
+                .map(kb -> kb * 1000)

Review Comment:
   just to confirm, if it is `kb*1000` or `kb*1024`; and possible add a comment for number literal used in code



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosOptions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import com.azure.cosmos.CosmosClientBuilder;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.azure.options.AzureOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   same here,  If possible, do not suppress nullness warning in new code



##########
sdks/java/io/azure-cosmos/README.md:
##########
@@ -0,0 +1,65 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmos
+
+```shell
+gradle sdks:java:io:azure-cosmos:build
+```
+
+Valite code:

Review Comment:
   validate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1614764431

   Thanks, taking a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1254138979


##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   I think I'll have to keep this one as the `CosmosAsyncClient` does not use nullable annotations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1614673862

   @kennknowles & @Abacn can you have a look as you commented on https://github.com/apache/beam/pull/23610 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1255501654


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -612,6 +612,7 @@ class BeamModulePlugin implements Plugin<Project> {
         aws_java_sdk2_regions                       : "software.amazon.awssdk:regions:$aws_java_sdk2_version",
         aws_java_sdk2_utils                         : "software.amazon.awssdk:utils:$aws_java_sdk2_version",
         aws_java_sdk2_profiles                      : "software.amazon.awssdk:profiles:$aws_java_sdk2_version",
+        azure_sdk_bom                               : "com.azure:azure-sdk-bom:1.2.14",

Review Comment:
   I updated to use the `azure-sdk-bom` to ensure azure module compatibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1450063042

   R: @kennknowles as you were part of the previous discussion in the linked PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1605456728

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1450064397

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #25672:
URL: https://github.com/apache/beam/pull/25672#issuecomment-1450215315

   @Miuler can you give it a try in a real setup ? I've only tested with testcontainers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1254086836


##########
sdks/java/io/azure-cosmos/README.md:
##########
@@ -0,0 +1,65 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmos
+
+```shell
+gradle sdks:java:io:azure-cosmos:build
+```
+
+Valite code:

Review Comment:
   Will remove the `README` as it is not informative. Most info here are common to the whole beam repo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1254139328


##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CosmosIO {
+
+  private CosmosIO() {}
+
+  private static final String DEFAULT_QUERY = "SELECT * FROM root";
+
+  /** Provide a {@link Read} {@link PTransform} to read data from a Cosmos DB. */
+  public static <T> Read<T> read(Class<T> classType) {
+    return new AutoValue_CosmosIO_Read.Builder<T>().setClassType(classType).build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  @SuppressWarnings({"rawtypes"})
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable Class<T> getClassType();
+
+    abstract @Nullable String getDatabase();
+
+    abstract @Nullable String getContainer();
+
+    abstract @Nullable String getQuery();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setClassType(Class<T> classType);
+
+      abstract Builder<T> setDatabase(String database);
+
+      abstract Builder<T> setContainer(String container);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Read<T> build();
+    }
+
+    /** Specify the Cosmos database to read from. */
+    public Read<T> withDatabase(String database) {
+      checkArgument(database != null, "database can not be null");
+      checkArgument(!database.isEmpty(), "database can not be empty");
+      return builder().setDatabase(database).build();
+    }
+
+    /** Specify the Cosmos container to read from. */
+    public Read<T> withContainer(String container) {
+      checkArgument(container != null, "container can not be null");
+      checkArgument(!container.isEmpty(), "container can not be empty");
+      return builder().setContainer(container).build();
+    }
+
+    /** Specify the query to read data. */
+    public Read<T> withQuery(String query) {
+      return builder().setQuery(query).build();
+    }
+
+    /** Specify the {@link Coder} used to serialize the document in the {@link PCollection}. */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkState(getDatabase() != null, "withDatabase() is required");
+      checkState(getContainer() != null, "withContainer() is required");
+      checkState(getCoder() != null, "withCoder() is required");
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedCosmosBDSource<>(this)));
+    }
+  }
+
+  /** A {@link BoundedSource} reading from Comos. */
+  @VisibleForTesting
+  public static class BoundedCosmosBDSource<T> extends BoundedSource<T> {
+
+    private final Read<T> spec;
+    private final NormalizedRange range;
+
+    private @Nullable Long estimatedByteSize;
+
+    BoundedCosmosBDSource(Read<T> spec) {
+      this(spec, NormalizedRange.FULL_RANGE, null);
+    }
+
+    BoundedCosmosBDSource(Read<T> spec, NormalizedRange range, Long estimatedSize) {
+      this.spec = spec;
+      this.range = range;
+      this.estimatedByteSize = estimatedSize;
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      CosmosClientBuilder builder = options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncDatabase database = client.getDatabase(spec.getDatabase());
+        CosmosAsyncContainer container = database.getContainer(spec.getContainer());
+        AsyncDocumentClient document = CosmosBridgeInternal.getAsyncDocumentClient(client);
+
+        List<BoundedCosmosBDSource<T>> sources = new ArrayList<>();
+        long rangeSize = getEstimatedSizeBytes(options);
+        float splitsFloat = (float) rangeSize / desiredBundleSizeBytes;
+        int splits = (int) Math.ceil(splitsFloat);
+
+        // match internal impl of CosmosAsyncContainer trySplitFeedRange
+        String databaseLink =
+            ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor()
+                .getLink(database);
+        String containerLink =
+            databaseLink + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/" + container.getId();
+        Mono<Utils.ValueHolder<DocumentCollection>> getCollectionObservable =
+            document
+                .getCollectionCache()
+                .resolveByNameAsync(null, containerLink, null)
+                .map(Utils.ValueHolder::initialize);
+
+        List<NormalizedRange> subRanges =
+            FeedRangeInternal.convert(range.toFeedRange())
+                .trySplit(
+                    document.getPartitionKeyRangeCache(), null, getCollectionObservable, splits)
+                .block().stream()
+                .map(NormalizedRange::fromFeedRange)
+                .collect(Collectors.toList());
+
+        long estimatedSubRangeSize = rangeSize / subRanges.size();
+        for (NormalizedRange subrange : subRanges) {
+          sources.add(new BoundedCosmosBDSource<>(spec, subrange, estimatedSubRangeSize));
+        }
+
+        return sources;
+      }
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      if (estimatedByteSize != null) {
+        return estimatedByteSize;
+      }
+      CosmosClientBuilder builder = options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncContainer container =
+            client.getDatabase(spec.getDatabase()).getContainer(spec.getContainer());
+
+        CosmosChangeFeedRequestOptions requestOptions =
+            CosmosChangeFeedRequestOptions.createForProcessingFromNow(range.toFeedRange());
+        requestOptions.setMaxItemCount(1);
+        requestOptions.setMaxPrefetchPageCount(1);
+        requestOptions.setQuotaInfoEnabled(true);
+
+        estimatedByteSize =
+            container
+                .queryChangeFeed(requestOptions, ObjectNode.class)
+                .byPage()
+                .take(1)
+                .map(FeedResponse::getDocumentUsage)
+                .map(kb -> kb * 1000)

Review Comment:
   right! Good catch. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #25672: Support for read from Cosmos DB Core SQL API

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #25672:
URL: https://github.com/apache/beam/pull/25672


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org