You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/10 22:34:31 UTC

[GitHub] [beam] pabloem commented on a diff in pull request #21723: Add several IOs to the typescript SDK.

pabloem commented on code in PR #21723:
URL: https://github.com/apache/beam/pull/21723#discussion_r894923232


##########
sdks/typescript/src/apache_beam/io/kafka.ts:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../../apache_beam";
+import * as external from "../transforms/external";
+import { Schema } from "../proto/schema";
+import { RowCoder } from "../coders/row_coder";
+import { serviceProviderFromJavaGradleTarget } from "../utils/service";
+import * as protobufjs from "protobufjs";
+import { camelToSnakeOptions } from "../utils/utils";
+
+const KAFKA_EXPANSION_GRADLE_TARGET =
+  "sdks:java:io:expansion-service:shadowJar";
+
+export type ReadFromKafkaOptions = {
+  keyDeserializer?: string;
+  valueDeserializer?: string;
+  startReadTime?: number;
+  maxNumRecords?: number;
+  maxReadTime?: number;
+  commitOffsetInFinalize?: boolean;
+  timestampPolicy?: "ProcessingTime" | "CreateTime" | "LogAppendTime";
+};
+
+const defaultReadFromKafkaOptions = {
+  keyDeserializer:
+    "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+  valueDeserializer:
+    "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+  timestampPolicy: "ProcessingTime",
+};
+
+export function readFromKafka(
+  consumerConfig: { [key: string]: string }, // TODO: Or a map?
+  topics: string[],
+  options: ReadFromKafkaOptions = {}
+): beam.AsyncPTransform<beam.Root, beam.PCollection<any>> {

Review Comment:
   the return type - is it `any`? Can we do something like parameterize it as `T`? I don't know what's best practice in typescript
   
   @KonradJanica - we're trying to have a `Row` type - basically, an element with a schema. I guess in JSON these are just objects - is there a better way of delimiting them rather than `any`? In this case, it's just a simple JSON-like object without functions, only data attached to it.



##########
sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts:
##########
@@ -79,6 +79,17 @@ class PortableRunnerPipelineResult implements PipelineResult {
     return state;
   }
 
+  async cancel() {

Review Comment:
   does it make sense to pass a completion callback to the cancel function itself?



##########
sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts:
##########
@@ -137,6 +148,11 @@ export class PortableRunner extends Runner {
     return await call.response;
   }
 
+  async cancelJob(jobId: string) {

Review Comment:
   ditto: "does it make sense to pass a completion callback to the cancel function itself?"



##########
sdks/typescript/src/apache_beam/io/pubsub.ts:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as PubSub from "@google-cloud/pubsub";
+
+import * as beam from "../pvalue";
+import * as external from "../transforms/external";
+import * as internal from "../transforms/internal";
+import { AsyncPTransform, withName } from "../transforms/transform";
+import { RowCoder } from "../coders/row_coder";
+import { BytesCoder } from "../coders/required_coders";
+import { serviceProviderFromJavaGradleTarget } from "../utils/service";
+import { camelToSnakeOptions } from "../utils/utils";
+
+const PUBSUB_EXPANSION_GRADLE_TARGET =
+  "sdks:java:io:google-cloud-platform:expansion-service:shadowJar";
+
+const readSchema = RowCoder.inferSchemaOfJSON({
+  topic: "string",
+  subscription: "string",
+  idAttribute: "string",
+  timestampAttribute: "string",
+});
+
+type ReadOptions =
+  | {
+      topic: string;
+      subscription?: never;
+      idAttribute?: string;
+      timestampAttribute?: string;
+    }
+  | {
+      topic?: never;
+      subscription: string;
+      idAttribute?: string;
+      timestampAttribute?: string;
+    };
+
+// TODO: Schema-producing variants.
+export function readFromPubSub(
+  options: ReadOptions
+): AsyncPTransform<beam.Root, beam.PCollection<Uint8Array>> {
+  if (options.topic && options.subscription) {
+    throw new TypeError(
+      "Exactly one of topic or subscription must be provided."
+    );
+  }
+  return withName(
+    "readFromPubSubRaw",
+    external.rawExternalTransform<beam.Root, beam.PCollection<Uint8Array>>(
+      "beam:transform:org.apache.beam:pubsub_read:v1",
+      camelToSnakeOptions(options),
+      serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET)
+    )
+  );
+}

Review Comment:
   is it worth having separate `read` and `readWithAttributes`? I would personally prefer always including attributes, and only dropping them for some kind of readBytes / readStrings transform. Thoughts?



##########
sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts:
##########
@@ -79,6 +79,17 @@ class PortableRunnerPipelineResult implements PipelineResult {
     return state;
   }
 
+  async cancel() {

Review Comment:
   feel free to add as TODO : )



##########
sdks/typescript/src/apache_beam/io/parquetio.ts:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../../apache_beam";
+import { StrUtf8Coder } from "../coders/standard_coders";
+import * as external from "../transforms/external";
+import { withCoderInternal } from "../transforms/internal";
+import { pythonTransform } from "../transforms/python";
+import { PythonService } from "../utils/service";
+import { camelToSnakeOptions } from "../utils/utils";
+import { Schema } from "../proto/schema";
+import { RowCoder } from "../coders/row_coder";
+

Review Comment:
   so cool that this can be done with pandas



##########
sdks/typescript/src/apache_beam/worker/worker_main.ts:
##########
@@ -29,6 +29,8 @@ import { BeamFnLoggingClient } from "../proto/beam_fn_api.grpc-client";
 // Needed for registration.
 import * as row_coder from "../coders/row_coder";
 import * as combiners from "../transforms/combiners";
+import * as pubsub from "../io/pubsub";
+import * as assert from "../testing/assert";

Review Comment:
   are these needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org