You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/06/06 16:06:32 UTC
[beam] branch master updated: Clean up uses of == instead of === in ts sdk (#17732)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b623313707 Clean up uses of == instead of === in ts sdk (#17732)
4b623313707 is described below
commit 4b623313707df8a3c3846412f54edf2e3c947374
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Jun 6 12:06:23 2022 -0400
Clean up uses of == instead of === in ts sdk (#17732)
---
sdks/typescript/README.md | 2 +-
.../typescript/src/apache_beam/coders/js_coders.ts | 4 +--
.../src/apache_beam/coders/required_coders.ts | 12 +++----
.../typescript/src/apache_beam/coders/row_coder.ts | 13 ++++---
.../src/apache_beam/coders/standard_coders.ts | 2 +-
.../src/apache_beam/internal/pipeline.ts | 14 ++++----
sdks/typescript/src/apache_beam/pvalue.ts | 12 +++----
.../src/apache_beam/runners/artifacts.ts | 6 ++--
.../src/apache_beam/runners/direct_runner.ts | 11 +++---
.../apache_beam/runners/portable_runner/runner.ts | 6 ++--
sdks/typescript/src/apache_beam/runners/runner.ts | 12 +++----
sdks/typescript/src/apache_beam/serialization.ts | 2 +-
.../src/apache_beam/transforms/external.ts | 16 +++++----
.../src/apache_beam/transforms/flatten.ts | 2 +-
.../apache_beam/transforms/group_and_combine.ts | 12 ++++---
.../src/apache_beam/transforms/internal.ts | 2 +-
.../typescript/src/apache_beam/transforms/pardo.ts | 16 +++++----
sdks/typescript/src/apache_beam/transforms/sql.ts | 9 +++--
.../src/apache_beam/transforms/transform.ts | 6 ++--
.../src/apache_beam/transforms/window.ts | 2 +-
.../src/apache_beam/transforms/windowings.ts | 2 +-
sdks/typescript/src/apache_beam/utils/service.ts | 8 ++---
.../typescript/src/apache_beam/worker/operators.ts | 40 +++++++++++-----------
.../src/apache_beam/worker/pardo_context.ts | 20 +++++------
sdks/typescript/src/apache_beam/worker/state.ts | 8 ++---
sdks/typescript/src/apache_beam/worker/worker.ts | 12 +++----
sdks/typescript/test/standard_coders_test.ts | 2 +-
sdks/typescript/test/worker_test.ts | 6 ++--
28 files changed, 134 insertions(+), 125 deletions(-)
diff --git a/sdks/typescript/README.md b/sdks/typescript/README.md
index 745170bba65..bfde82acba8 100644
--- a/sdks/typescript/README.md
+++ b/sdks/typescript/README.md
@@ -142,7 +142,7 @@ on a portable runner) but the following big-ticket items remain.
* Enforce unique names for pipeline update.
- * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+ * Cleanup uses of var, this. Arrow functions.
* Avoid `any` return types (and re-enable check in compiler).
diff --git a/sdks/typescript/src/apache_beam/coders/js_coders.ts b/sdks/typescript/src/apache_beam/coders/js_coders.ts
index 542c9c1ae3a..202725c3988 100644
--- a/sdks/typescript/src/apache_beam/coders/js_coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/js_coders.ts
@@ -115,7 +115,7 @@ export class GeneralObjectCoder<T> implements Coder<T> {
};
encode(element: T, writer: Writer, context: Context) {
- if (element == null) {
+ if (element === null || element === undefined) {
// typeof is "object" but BSON can't handle it.
writer.string("Z");
} else {
@@ -128,7 +128,7 @@ export class GeneralObjectCoder<T> implements Coder<T> {
decode(reader: Reader, context: Context): T {
const typeMarker = reader.string();
- if (typeMarker == "Z") {
+ if (typeMarker === "Z") {
return null!;
} else {
const type = this.markerToTypes[typeMarker];
diff --git a/sdks/typescript/src/apache_beam/coders/required_coders.ts b/sdks/typescript/src/apache_beam/coders/required_coders.ts
index 0586009b842..288c7690d59 100644
--- a/sdks/typescript/src/apache_beam/coders/required_coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/required_coders.ts
@@ -271,7 +271,7 @@ export class IterableCoder<T> implements Coder<Iterable<T>> {
*/
encode(element: Iterable<T>, writer: Writer, context: Context) {
const elmLen = (element as Array<T>).length;
- if ((element as Array<T>).length !== undefined) {
+ if (elmLen !== undefined) {
const eArray = element as Array<T>;
writer.fixed32(swapEndian32(eArray.length));
for (let i = 0; i < eArray.length; ++i) {
@@ -542,13 +542,13 @@ export class PaneInfoCoder implements Coder<PaneInfo> {
private static chooseEncoding(value: PaneInfo): number {
if (
- (value.index == 0 && value.onTimeIndex == 0) ||
- value.timing == Timing.UNKNOWN
+ (value.index === 0 && value.onTimeIndex === 0) ||
+ value.timing === Timing.UNKNOWN
) {
return PaneInfoEncoding.NO_INDEX;
} else if (
- value.index == value.onTimeIndex ||
- value.timing == Timing.EARLY
+ value.index === value.onTimeIndex ||
+ value.timing === Timing.EARLY
) {
return PaneInfoEncoding.ONE_INDEX;
} else {
@@ -586,7 +586,7 @@ export class PaneInfoCoder implements Coder<PaneInfo> {
isFirst: isFirst,
isLast: isLast,
index: onlyIndex,
- onTimeIndex: timing == Timing.EARLY ? -1 : onlyIndex,
+ onTimeIndex: timing === Timing.EARLY ? -1 : onlyIndex,
timing: timing,
};
diff --git a/sdks/typescript/src/apache_beam/coders/row_coder.ts b/sdks/typescript/src/apache_beam/coders/row_coder.ts
index 9652610445f..d160154ead5 100644
--- a/sdks/typescript/src/apache_beam/coders/row_coder.ts
+++ b/sdks/typescript/src/apache_beam/coders/row_coder.ts
@@ -31,7 +31,6 @@ import {
StrUtf8Coder,
VarIntCoder,
} from "./standard_coders";
-import { Value } from "../proto/google/protobuf/struct";
const argsort = (x) =>
x
@@ -238,7 +237,7 @@ export class RowCoder implements Coder<any> {
if (this.schema.encodingPositionsSet) {
// Should never be duplicate encoding positions.
let encPosx = schema.fields.map((f: Field) => f.encodingPosition);
- if (encPosx.length != this.encodingPositions.length) {
+ if (encPosx.length !== this.encodingPositions.length) {
throw new Error(
`Schema with id ${this.schema.id} has encoding_positions_set=True, but not all fields have encoding_position set`
);
@@ -282,14 +281,14 @@ export class RowCoder implements Coder<any> {
let nullFields: number[] = [];
if (this.hasNullableFields) {
- if (attrs.some((attr) => attr == undefined)) {
+ if (attrs.some((attr) => attr === null || attr === undefined)) {
let running = 0;
attrs.forEach((attr, i) => {
- if (i && i % 8 == 0) {
+ if (i && i % 8 === 0) {
nullFields.push(running);
running = 0;
}
- running |= (attr == undefined ? 1 : 0) << i % 8;
+ running |= (attr === null || attr === undefined ? 1 : 0) << i % 8;
});
nullFields.push(running);
}
@@ -304,7 +303,7 @@ export class RowCoder implements Coder<any> {
positions.forEach((i) => {
let attr = attrs[i];
- if (attr == undefined) {
+ if (attr === null || attr === undefined) {
if (!this.fieldNullable[i]) {
throw new Error(
`Attempted to encode null for non-nullable field \"${this.schema.fields[i].name}\".`
@@ -334,7 +333,7 @@ export class RowCoder implements Coder<any> {
nulls = Array(nFields)
.fill(0)
.map((_, i) => {
- if (i % 8 == 0) {
+ if (i % 8 === 0) {
let chunk = Math.floor(i / 8);
running = chunk >= nullMask.length ? 0 : nullMask[chunk];
}
diff --git a/sdks/typescript/src/apache_beam/coders/standard_coders.ts b/sdks/typescript/src/apache_beam/coders/standard_coders.ts
index 5fdc4943afc..cb5562c200a 100644
--- a/sdks/typescript/src/apache_beam/coders/standard_coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/standard_coders.ts
@@ -176,7 +176,7 @@ export class NullableCoder<T> implements Coder<T | undefined> {
}
encode(element: T | undefined, writer: Writer, context: Context) {
- if (element == undefined) {
+ if (element === null || element === undefined) {
writer.bool(false);
} else {
writer.bool(true);
diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts
index f333bb6c99b..90f95325510 100644
--- a/sdks/typescript/src/apache_beam/internal/pipeline.ts
+++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts
@@ -43,7 +43,7 @@ export class PipelineContext {
getCoder<T>(coderId: string): Coder<T> {
const this_ = this;
- if (this.coders[coderId] == undefined) {
+ if (this.coders[coderId] === null || this.coders[coderId] === undefined) {
const coderProto = this.components.coders[coderId];
const components: Coder<unknown>[] = (
coderProto.componentCoderIds || []
@@ -70,7 +70,7 @@ export class PipelineContext {
}
getPCollectionCoderId<T>(pcoll: pvalue.PCollection<T>): string {
- const pcollId = typeof pcoll == "string" ? pcoll : pcoll.getId();
+ const pcollId = typeof pcoll === "string" ? pcoll : pcoll.getId();
return this.components!.pcollections[pcollId].coderId;
}
@@ -272,14 +272,14 @@ export class Pipeline {
const pcollId = this.context.createUniqueName("pc");
let coderId: string;
let windowingStrategyId: string;
- if (typeof coder == "string") {
+ if (typeof coder === "string") {
coderId = coder;
} else {
coderId = this.context.getCoderId(coder);
}
- if (windowingStrategy == undefined) {
+ if (windowingStrategy === null || windowingStrategy === undefined) {
windowingStrategyId = undefined!;
- } else if (typeof windowingStrategy == "string") {
+ } else if (typeof windowingStrategy === "string") {
windowingStrategyId = windowingStrategy;
} else {
windowingStrategyId = this.context.getWindowingStrategyId(
@@ -318,9 +318,9 @@ function onlyValueOr<T>(
defaultValue: T,
comparator: (a: T, b: T) => boolean = (a, b) => false
) {
- if (valueSet.size == 0) {
+ if (valueSet.size === 0) {
return defaultValue;
- } else if (valueSet.size == 1) {
+ } else if (valueSet.size === 1) {
return valueSet.values().next().value;
} else {
const candidate = valueSet.values().next().value;
diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts
index 1313f2241e7..a12c8bcf345 100644
--- a/sdks/typescript/src/apache_beam/pvalue.ts
+++ b/sdks/typescript/src/apache_beam/pvalue.ts
@@ -68,7 +68,7 @@ export class PCollection<T> {
constructor(pipeline: Pipeline, id: string | (() => string)) {
this.pipeline = pipeline;
- if (typeof id == "string") {
+ if (typeof id === "string") {
this.id = id;
} else {
this.computeId = id;
@@ -76,7 +76,7 @@ export class PCollection<T> {
}
getId(): string {
- if (this.id == undefined) {
+ if (this.id === null || this.id === undefined) {
this.id = this.computeId();
}
return this.id;
@@ -114,7 +114,7 @@ export class PCollection<T> {
process: function* (element: T, context: ContextT) {
// While it's legal to call a function with extra arguments which will
// be ignored, this can have surprising behavior (e.g. for map(console.log))
- yield context == undefined
+ yield context === null || context === undefined
? (fn as (T) => OutputT)(element)
: fn(element, context);
},
@@ -139,7 +139,7 @@ export class PCollection<T> {
process: function (element: T, context: ContextT) {
// While it's legal to call a function with extra arguments which will
// be ignored, this can have surprising behavior (e.g. for map(console.log))
- return context == undefined
+ return context === null || context === undefined
? (fn as (T) => Iterable<OutputT>)(element)
: fn(element, context);
},
@@ -176,7 +176,7 @@ export function flattenPValue<T>(
prefix: string = ""
): { [key: string]: PCollection<T> } {
const result: { [key: string]: PCollection<any> } = {};
- if (pValue == null) {
+ if (pValue === null || pValue === undefined) {
// pass
} else if (pValue instanceof Root) {
// pass
@@ -242,7 +242,7 @@ class PValueWrapper<T extends PValue<any>> {
}
private pipeline(root: Root | null = null) {
- if (root == null) {
+ if (root === null || root === undefined) {
const flat = flattenPValue(this.pvalue);
return Object.values(flat)[0].pipeline;
} else {
diff --git a/sdks/typescript/src/apache_beam/runners/artifacts.ts b/sdks/typescript/src/apache_beam/runners/artifacts.ts
index e5afa56d8c6..300c259056e 100644
--- a/sdks/typescript/src/apache_beam/runners/artifacts.ts
+++ b/sdks/typescript/src/apache_beam/runners/artifacts.ts
@@ -53,7 +53,7 @@ export async function* resolveArtifacts(
async function storeArtifact(
artifact: runnerApi.ArtifactInformation
): Promise<runnerApi.ArtifactInformation> {
- if (artifact.typeUrn == "beam:artifact:type:file:v1") {
+ if (artifact.typeUrn === "beam:artifact:type:file:v1") {
const payload = runnerApi.ArtifactFilePayload.fromBinary(
artifact.typePayload
);
@@ -102,8 +102,8 @@ export async function* resolveArtifacts(
for (const artifact of resolved.replacements) {
if (
- artifact.typeUrn == "beam:artifact:type:url:v1" ||
- artifact.typeUrn == "beam:artifact:type:embedded:v1"
+ artifact.typeUrn === "beam:artifact:type:url:v1" ||
+ artifact.typeUrn === "beam:artifact:type:embedded:v1"
) {
// TODO: (Typescript) Yield from asycn?
yield artifact;
diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
index 1482995eee5..21070ccfd7c 100644
--- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
@@ -78,7 +78,7 @@ export class DirectRunner extends Runner {
for (const env of Object.values(proto.components!.environments)) {
if (
env.urn &&
- env.urn != environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN
+ env.urn !== environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN
) {
yield env.urn;
}
@@ -196,7 +196,7 @@ class DirectGbkOperator implements operators.IOperator {
context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
// TODO: (Cleanup) Check or implement triggers, etc.
if (
- windowingStrategy.mergeStatus != runnerApi.MergeStatus_Enum.NON_MERGING
+ windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
) {
throw new Error("Non-merging WindowFn: " + windowingStrategy);
}
@@ -240,7 +240,7 @@ class DirectGbkOperator implements operators.IOperator {
timestamp: window.maxTimestamp(),
pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
});
- if (maybePromise != operators.NonPromise) {
+ if (maybePromise !== operators.NonPromise) {
await maybePromise;
}
}
@@ -304,8 +304,7 @@ function rewriteSideInputs(p: runnerApi.Pipeline, pipelineStateRef: string) {
const transforms = p.components!.transforms;
for (const [transformId, transform] of Object.entries(transforms)) {
if (
- transform.spec != undefined &&
- transform.spec.urn == parDo.urn &&
+ transform.spec?.urn === parDo.urn &&
Object.keys(transform.inputs).length > 1
) {
const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload);
@@ -469,7 +468,7 @@ class BufferOperator implements operators.IOperator {
async finishBundle() {
for (const element of this.elements) {
const maybePromise = this.receiver.receive(element);
- if (maybePromise != operators.NonPromise) {
+ if (maybePromise !== operators.NonPromise) {
await maybePromise;
}
}
diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
index f5281aa969d..8a8f3cbc204 100644
--- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
@@ -110,7 +110,7 @@ export class PortableRunner extends Runner {
private jobService: JobService | undefined = undefined
) {
super();
- if (typeof options == "string") {
+ if (typeof options === "string") {
this.defaultOptions = { jobEndpoint: options };
} else if (options) {
this.defaultOptions = options;
@@ -162,7 +162,7 @@ export class PortableRunner extends Runner {
}
let loopbackAddress: string | undefined = undefined;
- if ((options as any)?.environmentType == "LOOPBACK") {
+ if ((options as any)?.environmentType === "LOOPBACK") {
const workers = new ExternalWorkerPool();
loopbackAddress = await workers.start();
completionCallbacks.push(() => workers.stop());
@@ -173,7 +173,7 @@ export class PortableRunner extends Runner {
for (const [envId, env] of Object.entries(
pipeline.components!.environments
)) {
- if (env.urn == environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) {
+ if (env.urn === environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) {
if (loopbackAddress) {
pipeline.components!.environments[envId] =
environments.asExternalEnvironment(env, loopbackAddress);
diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts
index 7fc2ad79477..cd7f016a805 100644
--- a/sdks/typescript/src/apache_beam/runners/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/runner.ts
@@ -27,15 +27,15 @@ export interface PipelineResult {
export function createRunner(options): Runner {
let runnerConstructor: (any) => Runner;
- if (options.runner == undefined || options.runner == "default") {
+ if (options.runner === undefined || options.runner === "default") {
runnerConstructor = defaultRunner;
- } else if (options.runner == "direct") {
+ } else if (options.runner === "direct") {
runnerConstructor = require("./direct_runner").directRunner;
- } else if (options.runner == "universal") {
+ } else if (options.runner === "universal") {
runnerConstructor = require("./universal").universalRunner;
- } else if (options.runner == "flink") {
+ } else if (options.runner === "flink") {
runnerConstructor = require("./flink").flinkRunner;
- } else if (options.runner == "dataflow") {
+ } else if (options.runner === "dataflow") {
runnerConstructor = require("./dataflow").dataflowRunner;
} else {
throw new Error("Unknown runner: " + options.runner);
@@ -96,7 +96,7 @@ export function defaultRunner(defaultOptions: Object): Runner {
): Promise<PipelineResult> {
const directRunner =
require("./direct_runner").directRunner(defaultOptions);
- if (directRunner.unsupportedFeatures(pipeline, options).length == 0) {
+ if (directRunner.unsupportedFeatures(pipeline, options).length === 0) {
return directRunner.runPipeline(pipeline, options);
} else {
return require("./universal")
diff --git a/sdks/typescript/src/apache_beam/serialization.ts b/sdks/typescript/src/apache_beam/serialization.ts
index 1f5e20236b5..949d18924e1 100644
--- a/sdks/typescript/src/apache_beam/serialization.ts
+++ b/sdks/typescript/src/apache_beam/serialization.ts
@@ -90,7 +90,7 @@ function registerObject(qualified_name, value) {
}
function registerItem(name, entry) {
- if (!entry || typeof entry == "number" || typeof entry == "string") {
+ if (!entry || typeof entry === "number" || typeof entry === "string") {
// These already serialize just fine.
return;
} else if (registeredObjectSet.has(entry)) {
diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts
index bfb3ebe4182..af6d27e6623 100644
--- a/sdks/typescript/src/apache_beam/transforms/external.ts
+++ b/sdks/typescript/src/apache_beam/transforms/external.ts
@@ -85,7 +85,7 @@ class RawExternalTransform<
private inferPValueType: boolean = true
) {
super("External(" + urn + ")");
- if (payload == undefined) {
+ if (payload === null || payload === undefined) {
this.payload = undefined;
} else if (payload instanceof Uint8Array) {
this.payload = payload as Uint8Array;
@@ -93,7 +93,7 @@ class RawExternalTransform<
this.payload = encodeSchemaPayload(payload);
}
- if (typeof serviceProviderOrAddress == "string") {
+ if (typeof serviceProviderOrAddress === "string") {
this.serviceProvider = async () =>
new service.ExternalService(serviceProviderOrAddress);
} else {
@@ -185,7 +185,7 @@ class RawExternalTransform<
// Don't even bother creating a connection if there are no dependencies.
if (
Object.values(components.environments).every(
- (env) => env.dependencies.length == 0
+ (env) => env.dependencies.length === 0
)
) {
return components;
@@ -250,7 +250,7 @@ class RawExternalTransform<
);
if (newTags.length > 1) {
throw new Error("Ambiguous renaming of tags.");
- } else if (newTags.length == 1) {
+ } else if (newTags.length === 1) {
const missingTags = difference(
new Set(Object.keys(transformProto.inputs)),
new Set(Object.keys(response.transform!.inputs))
@@ -276,7 +276,9 @@ class RawExternalTransform<
t.inputs = Object.fromEntries(
Object.entries(t.inputs).map(([k, v]) => [
k,
- renamedInputs[v] != undefined ? renamedInputs[v] : v,
+ renamedInputs[v] !== null && renamedInputs[v] !== undefined
+ ? renamedInputs[v]
+ : v,
])
);
}
@@ -325,9 +327,9 @@ class RawExternalTransform<
// See: https://github.com/microsoft/TypeScript/issues/3628
if (this.inferPValueType) {
const outputKeys = [...Object.keys(response.transform!.outputs)];
- if (outputKeys.length == 0) {
+ if (outputKeys.length === 0) {
return null!;
- } else if (outputKeys.length == 1) {
+ } else if (outputKeys.length === 1) {
return new PCollection(
pipeline,
response.transform!.outputs[outputKeys[0]]
diff --git a/sdks/typescript/src/apache_beam/transforms/flatten.ts b/sdks/typescript/src/apache_beam/transforms/flatten.ts
index 57d29c236a5..4c5406fdd8a 100644
--- a/sdks/typescript/src/apache_beam/transforms/flatten.ts
+++ b/sdks/typescript/src/apache_beam/transforms/flatten.ts
@@ -38,7 +38,7 @@ export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
);
const coder =
- coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+ coders.size === 1 ? [...coders][0] : new GeneralObjectCoder<T>();
return pipeline.createPCollectionInternal<T>(coder);
}
diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
index 4f7de5a0d47..7055d51cd25 100644
--- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
+++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
@@ -66,7 +66,7 @@ export class GroupBy<T, K> extends PTransformClass<
) {
super();
[this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
- this.keyName = typeof this.keyNames == "string" ? this.keyNames : "key";
+ this.keyName = typeof this.keyNames === "string" ? this.keyNames : "key";
}
expand(input: PCollection<T>): PCollection<KV<K, Iterable<T>>> {
@@ -185,9 +185,9 @@ class GroupByAndCombine<T, O> extends PTransformClass<
)
.map(function constructResult(kv) {
const result = {};
- if (this_.keyNames == undefined) {
+ if (this_.keyNames === null || this_.keyNames === undefined) {
// Don't populate a key at all.
- } else if (typeof this_.keyNames == "string") {
+ } else if (typeof this_.keyNames === "string") {
result[this_.keyNames] = kv.key;
} else {
for (let i = 0; i < this_.keyNames.length; i++) {
@@ -224,7 +224,7 @@ export function countGlobally<T>(): PTransform<
}
function toCombineFn<I>(combiner: Combiner<I>): CombineFn<I, any, any> {
- if (typeof combiner == "function") {
+ if (typeof combiner === "function") {
return binaryCombineFn<I>(combiner);
} else {
return combiner;
@@ -244,7 +244,9 @@ function binaryCombineFn<I>(
createAccumulator: () => undefined,
addInput: (a, b) => (a === undefined ? b : combiner(a, b)),
mergeAccumulators: (accs) =>
- [...accs].filter((a) => a != undefined).reduce(combiner, undefined),
+ [...accs]
+ .filter((a) => a !== null && a !== undefined)
+ .reduce(combiner, undefined),
extractOutput: (a) => a,
};
}
diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts
index 1e257704cd1..5eedf3ded3a 100644
--- a/sdks/typescript/src/apache_beam/transforms/internal.ts
+++ b/sdks/typescript/src/apache_beam/transforms/internal.ts
@@ -124,7 +124,7 @@ export function groupByKey<K, V>(): PTransform<
pipelineComponents.pcollections[input.getId()].coderId
];
- if (inputCoderProto.spec!.urn != KVCoder.URN) {
+ if (inputCoderProto.spec!.urn !== KVCoder.URN) {
return input
.apply(
withCoderInternal(
diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts
index e05c0cb14f5..0d0d7f6af63 100644
--- a/sdks/typescript/src/apache_beam/transforms/pardo.ts
+++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts
@@ -67,7 +67,7 @@ export function parDo<
// Extract and populate side inputs from the context.
const sideInputs = {};
var contextCopy;
- if (typeof context == "object") {
+ if (typeof context === "object") {
contextCopy = Object.create(context as Object) as any;
const components = pipeline.context.components;
for (const [name, value] of Object.entries(context)) {
@@ -97,7 +97,7 @@ export function parDo<
windowMappingFn: {
urn: isGlobalSide
? urns.GLOBAL_WINDOW_MAPPING_FN_URN
- : mainWindowingStrategyId == sideWindowingStrategyId
+ : mainWindowingStrategyId === sideWindowingStrategyId
? urns.IDENTITY_WINDOW_MAPPING_FN_URN
: urns.ASSIGN_MAX_TIMESTAMP_WINDOW_MAPPING_FN_URN,
value: new Uint8Array(),
@@ -174,7 +174,7 @@ export function split<T extends { [key: string]: unknown }>(
options.unknownTagBehavior = "error";
}
if (
- options.unknownTagBehavior == "rename" &&
+ options.unknownTagBehavior === "rename" &&
!tags.includes(options.unknownTagName!)
) {
tags.push(options.unknownTagName!);
@@ -222,7 +222,7 @@ export class ParDoParam<T> {
// TODO: Nameing "get" seems to be special.
lookup(): T {
- if (this.provider == undefined) {
+ if (this.provider === undefined) {
throw new Error("Cannot be called outside of a DoFn's process method.");
}
@@ -313,9 +313,13 @@ export function singletonSideInput<T>(
accessPattern: "beam:side_input:iterable:v1",
toValue: (iter: Iterable<T>) => {
const asArray = Array.from(iter);
- if (asArray.length == 0 && defaultValue != undefined) {
+ if (
+ asArray.length === 0 &&
+ defaultValue !== null &&
+ defaultValue !== undefined
+ ) {
return defaultValue;
- } else if (asArray.length == 1) {
+ } else if (asArray.length === 1) {
return asArray[0];
} else {
throw new Error("Expected a single element, got " + asArray.length);
diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts
index 417a277609c..9170683c1cd 100644
--- a/sdks/typescript/src/apache_beam/transforms/sql.ts
+++ b/sdks/typescript/src/apache_beam/transforms/sql.ts
@@ -49,7 +49,7 @@ export function sqlTransform<
// TOOD: (API) (Typescript): How to infer input_types, or at least make it optional.
async function expandInternal(input: InputT): Promise<PCollection<any>> {
function withCoder<T>(pcoll: PCollection<T>, type): PCollection<T> {
- if (type == null) {
+ if (type) {
if (
!(
pcoll.pipeline.context.getPCollectionCoder(pcoll) instanceof
@@ -74,7 +74,12 @@ export function sqlTransform<
input = Object.fromEntries(
Object.keys(input).map((tag) => [
tag,
- withCoder(input[tag], inputTypes == null ? null : inputTypes[tag]),
+ withCoder(
+ input[tag],
+ inputTypes === null || inputTypes === undefined
+ ? null
+ : inputTypes[tag]
+ ),
])
) as InputT;
}
diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts
index 55b1da53a75..fe84009987b 100644
--- a/sdks/typescript/src/apache_beam/transforms/transform.ts
+++ b/sdks/typescript/src/apache_beam/transforms/transform.ts
@@ -27,13 +27,13 @@ export function withName<T>(name: string | (() => string), arg: T): T {
export function extractName<T>(withName: T): string {
const untyped = withName as any;
- if (untyped.beamName != undefined) {
- if (typeof untyped.beamName == "string") {
+ if (untyped.beamName !== null && untyped.beamName !== undefined) {
+ if (typeof untyped.beamName === "string") {
return untyped.beamName;
} else {
return untyped.beamName();
}
- } else if (untyped.name && untyped.name != "anonymous") {
+ } else if (untyped.name && untyped.name !== "anonymous") {
return untyped.name;
} else {
const stringified = ("" + withName)
diff --git a/sdks/typescript/src/apache_beam/transforms/window.ts b/sdks/typescript/src/apache_beam/transforms/window.ts
index 57a335ba580..d74570b6bf9 100644
--- a/sdks/typescript/src/apache_beam/transforms/window.ts
+++ b/sdks/typescript/src/apache_beam/transforms/window.ts
@@ -41,7 +41,7 @@ export function createWindowingStrategyProto(
windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
): runnerApi.WindowingStrategy {
let result: runnerApi.WindowingStrategy;
- if (windowingStrategyBase == undefined) {
+ if (windowingStrategyBase === null || windowingStrategyBase === undefined) {
result = {
windowFn: undefined!,
windowCoderId: undefined!,
diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts
index 89c4941ed07..999c238c391 100644
--- a/sdks/typescript/src/apache_beam/transforms/windowings.ts
+++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts
@@ -90,7 +90,7 @@ export function sessions(gapSeconds: number | Long): WindowFn<IntervalWindow> {
}
function secsToMillisLong(secs: number | Long): Long {
- if (typeof secs == "number") {
+ if (typeof secs === "number") {
return Long.fromValue(secs * 1000);
} else {
return secs.mul(1000);
diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts
index bb5554e3123..885cd8e44f8 100644
--- a/sdks/typescript/src/apache_beam/utils/service.ts
+++ b/sdks/typescript/src/apache_beam/utils/service.ts
@@ -155,7 +155,7 @@ export class JavaJarService extends SubprocessService {
static JAR_CACHE = path.join(BEAM_CACHE, "jars");
constructor(jar: string, args: string[] | undefined = undefined) {
- if (args == undefined) {
+ if (args === null || args === undefined) {
// TODO: (Extension) Should filesToStage be set at some higher level?
args = ["{{PORT}}", "--filesToStage=" + jar];
}
@@ -268,7 +268,7 @@ export class JavaJarService extends SubprocessService {
): string {
return (
[artifactId, appendix, version, classifier]
- .filter((s) => s != undefined)
+ .filter((s) => s !== null && s !== undefined)
.join("-") + ".jar"
);
}
@@ -281,7 +281,7 @@ export class PythonService extends SubprocessService {
for (const bin of ["python3", "python"]) {
try {
const result = childProcess.spawnSync(bin, ["--version"]);
- if (result.status == 0) {
+ if (result.status === 0) {
return bin;
}
} catch (err) {
@@ -317,7 +317,7 @@ export class PythonService extends SubprocessService {
[bootstrapScript],
{ encoding: "latin1" }
);
- if (result.status == 0) {
+ if (result.status === 0) {
console.debug(result.stdout);
const lines = result.stdout.trim().split("\n");
return lines[lines.length - 1];
diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts
index 366e794c0d6..e9a0718a854 100644
--- a/sdks/typescript/src/apache_beam/worker/operators.ts
+++ b/sdks/typescript/src/apache_beam/worker/operators.ts
@@ -29,8 +29,8 @@ import * as urns from "../internal/urns";
import { PipelineContext } from "../internal/pipeline";
import { deserializeFn } from "../internal/serialize";
import { Coder, Context as CoderContext } from "../coders/coders";
-import { Window, Instant, PaneInfo, WindowedValue } from "../values";
-import { parDo, DoFn, ParDoParam, SplitOptions } from "../transforms/pardo";
+import { Window, Instant, WindowedValue } from "../values";
+import { parDo, DoFn, SplitOptions } from "../transforms/pardo";
import { WindowFn } from "../transforms/window";
import {
@@ -47,14 +47,14 @@ export type ProcessResult = null | Promise<void>;
export class ProcessResultBuilder {
promises: Promise<void>[] = [];
add(result: ProcessResult) {
- if (result != NonPromise) {
+ if (result !== NonPromise) {
this.promises.push(result as Promise<void>);
}
}
build(): ProcessResult {
- if (this.promises.length == 0) {
+ if (this.promises.length === 0) {
return NonPromise;
- } else if (this.promises.length == 1) {
+ } else if (this.promises.length === 1) {
return this.promises[0];
} else {
return Promise.all(this.promises).then(() => void null);
@@ -75,7 +75,7 @@ export class Receiver {
constructor(private operators: IOperator[]) {}
receive(wvalue: WindowedValue<unknown>): ProcessResult {
- if (this.operators.length == 1) {
+ if (this.operators.length === 1) {
return this.operators[0].process(wvalue);
} else {
const result = new ProcessResultBuilder();
@@ -108,7 +108,7 @@ export function createOperator(
// Ensure receivers are eagerly created.
Object.values(transform.outputs).map(context.getReceiver);
let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
- if (operatorConstructor == undefined) {
+ if (operatorConstructor === null || operatorConstructor === undefined) {
throw new Error("Unknown transform type:" + transform.spec!.urn);
}
return operatorConstructor(transformId, transform, context);
@@ -192,7 +192,7 @@ class DataSourceOperator implements IOperator {
const maybePromise = this_.receiver.receive(
this_.coder.decode(reader, CoderContext.needsDelimiters)
);
- if (maybePromise != NonPromise) {
+ if (maybePromise !== NonPromise) {
await maybePromise;
}
}
@@ -349,7 +349,7 @@ class GenericParDoOperator implements IOperator {
}
process(wvalue: WindowedValue<unknown>) {
- if (this.augmentedContext && wvalue.windows.length != 1) {
+ if (this.augmentedContext && wvalue.windows.length !== 1) {
// We need to process each window separately.
// TODO: (Perf) We could inspect the context more deeply and allow some
// cases to go through.
@@ -396,7 +396,7 @@ class GenericParDoOperator implements IOperator {
// If we were able to do so without any deferred actions, process the
// element immediately.
- if (updateContextResult == NonPromise) {
+ if (updateContextResult === NonPromise) {
return reallyProcess();
} else {
// Otherwise return a promise that first waits for all the deferred
@@ -404,7 +404,7 @@ class GenericParDoOperator implements IOperator {
return (async () => {
await updateContextResult;
const update2 = this.paramProvider.update(wvalue);
- if (update2 != NonPromise) {
+ if (update2 !== NonPromise) {
throw new Error("Expected all promises to be resolved: " + update2);
}
await reallyProcess();
@@ -423,7 +423,7 @@ class GenericParDoOperator implements IOperator {
// elements from different windows, so each element must specify its window.
for (const element of finishBundleOutput) {
const maybePromise = this.receiver.receive(element);
- if (maybePromise != NonPromise) {
+ if (maybePromise !== NonPromise) {
await maybePromise;
}
}
@@ -454,16 +454,16 @@ class SplittingDoFnOperator implements IOperator {
process(wvalue: WindowedValue<unknown>) {
const result = new ProcessResultBuilder();
const keys = Object.keys(wvalue.value as object);
- if (this.options.exclusive && keys.length != 1) {
+ if (this.options.exclusive && keys.length !== 1) {
throw new Error(
"Multiple keys for exclusively split element: " + wvalue.value
);
}
for (let tag of keys) {
if (!this.options.knownTags!.includes(tag)) {
- if (this.options.unknownTagBehavior == "rename") {
+ if (this.options.unknownTagBehavior === "rename") {
tag = this.options.unknownTagName!;
- } else if (this.options.unknownTagBehavior == "ignore") {
+ } else if (this.options.unknownTagBehavior === "ignore") {
continue;
} else {
throw new Error(
@@ -550,7 +550,7 @@ registerOperatorConstructor(
);
const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload);
// TODO: (Cleanup) Ideally we could branch on the urn itself, but some runners have a closed set of known URNs.
- if (spec.doFn?.urn == urns.SERIALIZED_JS_DOFN_INFO) {
+ if (spec.doFn?.urn === urns.SERIALIZED_JS_DOFN_INFO) {
return new GenericParDoOperator(
transformId,
context.getReceiver(onlyElement(Object.values(transform.outputs))),
@@ -559,21 +559,21 @@ registerOperatorConstructor(
transform,
context
);
- } else if (spec.doFn?.urn == urns.IDENTITY_DOFN_URN) {
+ } else if (spec.doFn?.urn === urns.IDENTITY_DOFN_URN) {
return new IdentityParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs)))
);
- } else if (spec.doFn?.urn == urns.JS_WINDOW_INTO_DOFN_URN) {
+ } else if (spec.doFn?.urn === urns.JS_WINDOW_INTO_DOFN_URN) {
return new AssignWindowsParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs))),
deserializeFn(spec.doFn.payload!).windowFn
);
- } else if (spec.doFn?.urn == urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN) {
+ } else if (spec.doFn?.urn === urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN) {
return new AssignTimestampsParDoOperator(
context.getReceiver(onlyElement(Object.values(transform.outputs))),
deserializeFn(spec.doFn.payload!).func
);
- } else if (spec.doFn?.urn == urns.SPLITTING_JS_DOFN_URN) {
+ } else if (spec.doFn?.urn === urns.SPLITTING_JS_DOFN_URN) {
return new SplittingDoFnOperator(
Object.fromEntries(
Object.entries(transform.outputs).map(([tag, pcId]) => [
diff --git a/sdks/typescript/src/apache_beam/worker/pardo_context.ts b/sdks/typescript/src/apache_beam/worker/pardo_context.ts
index fcf12b67c80..c810430abe4 100644
--- a/sdks/typescript/src/apache_beam/worker/pardo_context.ts
+++ b/sdks/typescript/src/apache_beam/worker/pardo_context.ts
@@ -18,10 +18,8 @@
import * as protobufjs from "protobufjs";
-import { PTransform, PCollection } from "../proto/beam_runner_api";
import * as runnerApi from "../proto/beam_runner_api";
import * as fnApi from "../proto/beam_fn_api";
-import { MultiplexingDataChannel, IDataChannel } from "./data";
import { StateProvider } from "./state";
import * as urns from "../internal/urns";
@@ -65,7 +63,7 @@ export class ParamProviderImpl implements ParamProvider {
// if they are widely shared.
augmentContext(context: any) {
this.prefetchCallbacks = [];
- if (typeof context != "object") {
+ if (typeof context !== "object") {
return context;
}
@@ -73,13 +71,13 @@ export class ParamProviderImpl implements ParamProvider {
for (const [name, value] of Object.entries(context)) {
// Is this the best way to check post serialization?
if (
- typeof value == "object" &&
- value != null &&
- value["parDoParamName"] != undefined
+ typeof value === "object" &&
+ value !== null &&
+ value["parDoParamName"] !== undefined
) {
result[name] = Object.create(value);
result[name].provider = this;
- if ((value as ParDoParam<unknown>).parDoParamName == "sideInput") {
+ if ((value as ParDoParam<unknown>).parDoParamName === "sideInput") {
this.prefetchCallbacks.push(
this.prefetchSideInput(
value as SideInputParam<unknown, unknown, unknown>
@@ -121,7 +119,7 @@ export class ParamProviderImpl implements ParamProvider {
windowCoder
);
const lookupResult = stateProvider.getState(stateKey, decode);
- if (lookupResult.type == "value") {
+ if (lookupResult.type === "value") {
this_.sideInputValues.set(param.sideInputId, lookupResult.value);
return operators.NonPromise;
} else {
@@ -134,12 +132,12 @@ export class ParamProviderImpl implements ParamProvider {
update(wvalue: WindowedValue<unknown> | undefined): operators.ProcessResult {
this.wvalue = wvalue;
- if (wvalue == undefined) {
+ if (wvalue === null || wvalue === undefined) {
return operators.NonPromise;
}
// We have to prefetch all the side inputs.
// TODO: (API) Let the user's process() await them.
- if (this.prefetchCallbacks.length == 0) {
+ if (this.prefetchCallbacks.length === 0) {
return operators.NonPromise;
} else {
const result = new operators.ProcessResultBuilder();
@@ -151,7 +149,7 @@ export class ParamProviderImpl implements ParamProvider {
}
provide(param) {
- if (this.wvalue == undefined) {
+ if (this.wvalue === null || this.wvalue === undefined) {
throw new Error(
param.parDoParamName + " not defined outside of a process() call."
);
diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts
index afc7f3ff722..ad8493ade20 100644
--- a/sdks/typescript/src/apache_beam/worker/state.ts
+++ b/sdks/typescript/src/apache_beam/worker/state.ts
@@ -66,7 +66,7 @@ export class CachingStateProvider implements StateProvider {
}
let result = this.underlying.getState(stateKey, decode);
const this_ = this;
- if (result.type == "promise") {
+ if (result.type === "promise") {
result = {
type: "promise",
promise: result.promise.then((value) => {
@@ -172,7 +172,7 @@ export class MultiplexingStateChannel {
return;
}
let getResponse: fnApi.StateGetResponse;
- if (response.response.oneofKind == "get") {
+ if (response.response.oneofKind === "get") {
getResponse = response.response.get;
} else {
reject("Expected get response " + response.response.oneofKind);
@@ -228,10 +228,10 @@ export class MultiplexingStateChannel {
}
export function Uint8ArrayConcat(chunks: Uint8Array[]) {
- if (chunks.length == 1) {
+ if (chunks.length === 1) {
// (Very) common case.
return chunks[0];
- } else if (chunks.length == 0) {
+ } else if (chunks.length === 0) {
return new Uint8Array();
} else {
const fullData = new Uint8Array(
diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts
index 8f30f26126f..1a9fea64c27 100644
--- a/sdks/typescript/src/apache_beam/worker/worker.ts
+++ b/sdks/typescript/src/apache_beam/worker/worker.ts
@@ -111,7 +111,7 @@ export class Worker {
async handleRequest(request) {
console.log(request);
- if (request.request.oneofKind == "processBundle") {
+ if (request.request.oneofKind === "processBundle") {
await this.process(request);
} else {
console.log("Unknown instruction type: ", request);
@@ -194,7 +194,7 @@ export class Worker {
this.bundleProcessors.set(descriptorId, []);
}
const processor = this.bundleProcessors.get(descriptorId)?.pop();
- if (processor != undefined) {
+ if (processor) {
return processor;
} else {
return new BundleProcessor(
@@ -321,8 +321,8 @@ export class BundleProcessor {
}
getStateProvider() {
- if (this.stateProvider == undefined) {
- if (typeof this.getStateChannel == "function") {
+ if (!this.stateProvider) {
+ if (typeof this.getStateChannel === "function") {
this.stateProvider = new CachingStateProvider(
new GrpcStateProvider(
this.getStateChannel(
@@ -339,7 +339,7 @@ export class BundleProcessor {
}
getBundleId() {
- if (this.currentBundleId == undefined) {
+ if (this.currentBundleId === null || this.currentBundleId === undefined) {
throw new Error("Not currently processing a bundle.");
}
return this.currentBundleId!;
@@ -373,7 +373,7 @@ function isPrimitive(transform: PTransform): boolean {
return true;
} else {
return (
- transform.subtransforms.length == 0 &&
+ transform.subtransforms.length === 0 &&
Object.values(transform.outputs).some((pcoll) => !inputs.includes(pcoll))
);
}
diff --git a/sdks/typescript/test/standard_coders_test.ts b/sdks/typescript/test/standard_coders_test.ts
index 3d34199fccb..46dec2b5cb7 100644
--- a/sdks/typescript/test/standard_coders_test.ts
+++ b/sdks/typescript/test/standard_coders_test.ts
@@ -94,7 +94,7 @@ const _urn_to_json_value_parser = {
timestamp: Long.fromNumber(x.timestamp),
}),
"beam:coder:nullable:v1": (components) => (x) => {
- return x == undefined ? undefined : components[0](x);
+ return x === null || x === undefined ? undefined : components[0](x);
},
};
diff --git a/sdks/typescript/test/worker_test.ts b/sdks/typescript/test/worker_test.ts
index 67051e36a84..96b00d6bab4 100644
--- a/sdks/typescript/test/worker_test.ts
+++ b/sdks/typescript/test/worker_test.ts
@@ -105,7 +105,7 @@ class Recording implements operators.IOperator {
process(wvalue: WindowedValue<any>) {
Recording.log.push(this.transformId + ".process(" + wvalue.value + ")");
const results = this.receivers.map((receiver) => receiver.receive(wvalue));
- if (!results.every((r) => r == operators.NonPromise)) {
+ if (!results.every((r) => r === operators.NonPromise)) {
throw new Error("Unexpected non-promise: " + results);
}
return operators.NonPromise;
@@ -134,10 +134,10 @@ class Partition implements operators.IOperator {
process(wvalue: WindowedValue<any>) {
const a = this.all.receive(wvalue);
- if (a != operators.NonPromise) throw new Error("Unexpected promise: " + a);
+ if (a !== operators.NonPromise) throw new Error("Unexpected promise: " + a);
if (wvalue.value.length > 3) {
const b = this.big.receive(wvalue);
- if (b != operators.NonPromise)
+ if (b !== operators.NonPromise)
throw new Error("Unexpected promise: " + b);
}
return operators.NonPromise;