You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2020/12/18 00:14:01 UTC
[beam] branch master updated: [BEAM-11458] Upgrade SamzRunner to
Samza 1.5 (#13550)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3663e03 [BEAM-11458] Upgrade SamzRunner to Samza 1.5 (#13550)
3663e03 is described below
commit 3663e03644f44966ecd873c41f60186565893657
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Thu Dec 17 16:13:26 2020 -0800
[BEAM-11458] Upgrade SamzRunner to Samza 1.5 (#13550)
---
runners/samza/build.gradle | 17 +-
.../beam/runners/samza/SamzaJobInvocation.java | 114 +++++
.../beam/runners/samza/SamzaJobServerDriver.java | 60 ++-
.../beam/runners/samza/SamzaPipelineOptions.java | 22 +-
.../samza/SamzaPipelineOptionsValidator.java | 38 +-
.../samza/SamzaPortablePipelineOptions.java | 13 +
.../runners/samza/SamzaRunnerOverrideConfigs.java | 15 +
.../samza/adapter/UnboundedSourceSystem.java | 39 +-
.../samza/container/BeamContainerRunner.java | 6 +-
.../samza/container/BeamJobCoordinatorRunner.java | 78 ++++
...inerCfgFactory.java => ContainerCfgLoader.java} | 18 +-
.../ContainerCfgLoaderFactory.java} | 20 +-
.../beam/runners/samza/runtime/BundleManager.java | 349 +++++++++++++++
.../apache/beam/runners/samza/runtime/DoFnOp.java | 277 +++++++-----
.../runners/samza/runtime/FutureCollector.java | 60 +++
.../beam/runners/samza/runtime/GroupByKeyOp.java | 11 +-
.../beam/runners/samza/runtime/KeyedTimerData.java | 1 +
.../beam/runners/samza/runtime/OpAdapter.java | 48 ++-
.../beam/runners/samza/runtime/OpEmitter.java | 5 +
.../samza/runtime/OutputManagerFactory.java | 5 +
.../samza/runtime/SamzaStoreStateInternals.java | 230 ++++++----
.../samza/runtime/SamzaTimerInternalsFactory.java | 217 ++++++++--
.../runners/samza/translation/ConfigBuilder.java | 77 ++--
.../runners/samza/translation/ConfigContext.java | 8 +
.../translation/FlattenPCollectionsTranslator.java | 2 +-
.../samza/translation/GroupByKeyTranslator.java | 24 +-
.../translation/ParDoBoundMultiTranslator.java | 56 ++-
.../samza/translation/SamzaPipelineTranslator.java | 21 +-
.../translation/SamzaTestStreamSystemFactory.java | 179 ++++++++
.../translation/SamzaTestStreamTranslator.java | 100 +++++
.../translation/SplittableParDoTranslators.java | 6 +-
.../samza/translation/TranslationContext.java | 59 ++-
.../samza/translation/WindowAssignTranslator.java | 4 +-
.../beam/runners/samza/util/FutureUtils.java | 50 +++
.../samza/SamzaPipelineOptionsValidatorTest.java | 60 +++
.../samza/adapter/UnboundedSourceSystemTest.java | 28 ++
.../runners/samza/runtime/BundleManagerTest.java | 474 +++++++++++++++++++++
.../samza/runtime/FutureCollectorImplTest.java | 92 ++++
.../runners/samza/runtime/KeyedTimerDataTest.java | 14 +-
.../runtime/SamzaStoreStateInternalsTest.java | 38 +-
.../runtime/SamzaTimerInternalsFactoryTest.java | 416 +++++++++++++++++-
.../samza/translation/ConfigGeneratorTest.java | 86 +++-
.../samza/translation/TranslationContextTest.java | 94 ++++
.../beam/runners/samza/util/FutureUtilsTest.java | 107 +++++
.../site/content/en/documentation/runners/samza.md | 5 +
45 files changed, 3228 insertions(+), 415 deletions(-)
diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index 3d02d9d..8a14b24 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -40,7 +40,7 @@ configurations {
validatesRunner
}
-def samza_version = "1.3.0"
+def samza_version = "1.5.0"
dependencies {
compile library.java.vendored_guava_26_0_jre
@@ -53,7 +53,13 @@ dependencies {
compile library.java.slf4j_api
compile library.java.joda_time
compile library.java.commons_io
+ compile library.java.commons_lang3
compile library.java.args4j
+ compile "javax.servlet:javax.servlet-api:3.1.0"
+ compile "io.dropwizard.metrics:metrics-core:3.1.2"
+ compile "org.rocksdb:rocksdbjni:5.7.3"
+ compile "org.apache.commons:commons-collections4:4.0"
+ compile "org.scala-lang:scala-library:2.11.8"
compile "org.apache.samza:samza-api:$samza_version"
compile "org.apache.samza:samza-core_2.11:$samza_version"
compile "org.apache.samza:samza-kafka_2.11:$samza_version"
@@ -87,11 +93,10 @@ task validatesRunner(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
@@ -99,6 +104,8 @@ task validatesRunner(type: Test) {
filter {
// TODO(BEAM-10025)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded'
+ // TODO(BEAM-11479)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
// These tests fail since there is no support for side inputs in Samza's unbounded splittable DoFn integration
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testWindowedSideInputWithCheckpointsUnbounded'
@@ -110,6 +117,10 @@ task validatesRunner(type: Test) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedUnbounded'
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded'
}
+ filter {
+ // Re-enable the test after Samza runner supports same state id across DoFn(s).
+ excludeTest('ParDoTest$StateTests', 'testValueStateSameId')
+ }
}
// Generates :runners:samza:runQuickstartJavaSamza
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
new file mode 100644
index 0000000..acaea62
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.jobsubmission.JobInvocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class SamzaJobInvocation extends JobInvocation {
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+ private final SamzaPipelineOptions options;
+ private final RunnerApi.Pipeline originalPipeline;
+ private volatile SamzaPipelineResult pipelineResult;
+
+ public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) {
+ super(null, null, pipeline, null);
+ this.originalPipeline = pipeline;
+ this.options = options;
+ }
+
+ private SamzaPipelineResult invokeSamzaJob() {
+ // Fused pipeline proto.
+ final RunnerApi.Pipeline fusedPipeline =
+ GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+ // the pipeline option coming from sdk will set the sdk specific runner which will break
+ // serialization
+ // so we need to reset the runner here to a valid Java runner
+ options.setRunner(SamzaRunner.class);
+ try {
+ final SamzaRunner runner = SamzaRunner.fromOptions(options);
+ return (SamzaPortablePipelineResult) runner.runPortablePipeline(fusedPipeline);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke samza job", e);
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ LOG.info("Starting job invocation {}", getId());
+ pipelineResult = invokeSamzaJob();
+ }
+
+ @Override
+ public String getId() {
+ return options.getJobName();
+ }
+
+ @Override
+ public synchronized void cancel() {
+ try {
+ if (pipelineResult != null) {
+ LOG.info("Cancelling pipeline {}", getId());
+ pipelineResult.cancel();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to cancel job.", e);
+ }
+ }
+
+ @Override
+ public JobApi.JobState.Enum getState() {
+ if (pipelineResult == null) {
+ return STARTING;
+ }
+ switch (pipelineResult.getState()) {
+ case RUNNING:
+ return RUNNING;
+ case FAILED:
+ return FAILED;
+ case DONE:
+ return DONE;
+ case STOPPED:
+ return STOPPED;
+ case UPDATED:
+ return UPDATED;
+ case CANCELLED:
+ return CANCELLED;
+ default:
+ return UNRECOGNIZED;
+ }
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index c3d1603..6895754 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -18,17 +18,18 @@
package org.apache.beam.runners.samza;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
+import org.apache.beam.sdk.expansion.service.ExpansionServer;
+import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
@@ -37,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Driver program that starts a job server. */
-// TODO extend JobServerDriver
+// TODO(BEAM-8510): extend JobServerDriver
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@@ -46,7 +47,7 @@ public class SamzaJobServerDriver {
private final SamzaPortablePipelineOptions pipelineOptions;
- private SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
+ protected SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
}
@@ -65,12 +66,13 @@ public class SamzaJobServerDriver {
overrideConfig.put(
SamzaRunnerOverrideConfigs.FN_CONTROL_PORT,
String.valueOf(pipelineOptions.getControlPort()));
+ overrideConfig.put(SamzaRunnerOverrideConfigs.FS_TOKEN_PATH, pipelineOptions.getFsTokenPath());
+
pipelineOptions.setConfigOverride(overrideConfig);
return new SamzaJobServerDriver(pipelineOptions);
}
- private static InMemoryJobService createJobService(SamzaPortablePipelineOptions pipelineOptions)
- throws IOException {
+ private InMemoryJobService createJobService() throws IOException {
JobInvoker jobInvoker =
new JobInvoker("samza-job-invoker") {
@Override
@@ -80,16 +82,7 @@ public class SamzaJobServerDriver {
@Nullable String retrievalToken,
ListeningExecutorService executorService)
throws IOException {
- String invocationId =
- String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString());
- SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions);
- JobInfo jobInfo =
- JobInfo.create(
- invocationId,
- pipelineOptions.getJobName(),
- retrievalToken,
- PipelineOptionsTranslation.toProto(pipelineOptions));
- return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
+ return new SamzaJobInvocation(pipeline, pipelineOptions);
}
};
return InMemoryJobService.create(
@@ -100,17 +93,46 @@ public class SamzaJobServerDriver {
InMemoryJobService.DEFAULT_MAX_INVOCATION_HISTORY);
}
+ private ExpansionServer createExpansionService(String host, int expansionPort)
+ throws IOException {
+ if (host == null) {
+ host = InetAddress.getLoopbackAddress().getHostName();
+ }
+ ExpansionServer expansionServer =
+ ExpansionServer.create(new ExpansionService(), host, expansionPort);
+ LOG.info(
+ "Java ExpansionService started on {}:{}",
+ expansionServer.getHost(),
+ expansionServer.getPort());
+ return expansionServer;
+ }
+
public void run() throws Exception {
- final InMemoryJobService service = createJobService(pipelineOptions);
+ // Create services
+ final InMemoryJobService service = createJobService();
final GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer =
GrpcFnServer.allocatePortAndCreateFor(
service, ServerFactory.createWithPortSupplier(pipelineOptions::getJobPort));
- LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+ final String jobServerUrl = jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl();
+ LOG.info("JobServer started on {}", jobServerUrl);
+ final URI uri = new URI(jobServerUrl);
+ final ExpansionServer expansionServer =
+ createExpansionService(uri.getHost(), pipelineOptions.getExpansionPort());
+
try {
jobServiceGrpcFnServer.getServer().awaitTermination();
} finally {
LOG.info("JobServer closing");
jobServiceGrpcFnServer.close();
+ if (expansionServer != null) {
+ try {
+ expansionServer.close();
+ LOG.info(
+ "Expansion stopped on {}:{}", expansionServer.getHost(), expansionServer.getPort());
+ } catch (Exception e) {
+ LOG.error("Error while closing the Expansion Service.", e);
+ }
+ }
}
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 3ff64e3..c0af1fa 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -23,8 +23,8 @@ import java.util.Map;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.samza.config.ConfigFactory;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.config.ConfigLoaderFactory;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.metrics.MetricsReporter;
/** Options which can be used to configure a Samza PortablePipelineRunner. */
@@ -38,10 +38,10 @@ public interface SamzaPipelineOptions extends PipelineOptions {
void setConfigFilePath(String filePath);
@Description("The factory to read config file from config file path.")
- @Default.Class(PropertiesConfigFactory.class)
- Class<? extends ConfigFactory> getConfigFactory();
+ @Default.Class(PropertiesConfigLoaderFactory.class)
+ Class<? extends ConfigLoaderFactory> getConfigLoaderFactory();
- void setConfigFactory(Class<? extends ConfigFactory> configFactory);
+ void setConfigLoaderFactory(Class<? extends ConfigLoaderFactory> configLoaderFactory);
@Description(
"The config override to set programmatically. It will be applied on "
@@ -76,6 +76,18 @@ public interface SamzaPipelineOptions extends PipelineOptions {
void setSystemBufferSize(int consumerBufferSize);
+ @Description("The maximum number of event-time timers to buffer in memory for a PTransform")
+ @Default.Integer(50000)
+ int getEventTimerBufferSize();
+
+ void setEventTimerBufferSize(int eventTimerBufferSize);
+
+ @Description("The maximum number of ready timers to process at once per watermark.")
+ @Default.Integer(Integer.MAX_VALUE)
+ int getMaxReadyTimersToProcessOnce();
+
+ void setMaxReadyTimersToProcessOnce(int maxReadyTimersToProcessOnce);
+
@Description("The maximum parallelism allowed for any data source.")
@Default.Integer(1)
int getMaxSourceParallelism();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
index 591c0ee..7702b6b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
@@ -18,11 +18,12 @@
package org.apache.beam.runners.samza;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-import static org.apache.samza.config.TaskConfig.MAX_CONCURRENCY;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
/** Validates that the {@link SamzaPipelineOptions} conforms to all the criteria. */
public class SamzaPipelineOptionsValidator {
@@ -32,30 +33,25 @@ public class SamzaPipelineOptionsValidator {
}
/*
- * Perform some bundling related validation for pipeline option .
+ * Perform some bundling related validation for pipeline option.
+ * Visible for testing.
*/
- private static void validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions) {
+ static void validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions) {
if (pipelineOptions.getMaxBundleSize() > 1) {
- // TODO: remove this check and implement bundling for side input, timer, etc in DoFnOp.java
- checkState(
- isPortable(pipelineOptions),
- "Bundling is not supported in non portable mode. Please disable by setting maxBundleSize to 1.");
-
- String taskConcurrencyConfig = MAX_CONCURRENCY;
- Map<String, String> configs =
+ final Map<String, String> configs =
pipelineOptions.getConfigOverride() == null
? new HashMap<>()
: pipelineOptions.getConfigOverride();
- long taskConcurrency = Long.parseLong(configs.getOrDefault(taskConcurrencyConfig, "1"));
- checkState(
- taskConcurrency == 1,
- "Bundling is not supported if "
- + taskConcurrencyConfig
- + " is greater than 1. Please disable bundling by setting maxBundleSize to 1. Or disable task concurrency.");
- }
- }
+ final JobConfig jobConfig = new JobConfig(new MapConfig(configs));
- private static boolean isPortable(SamzaPipelineOptions options) {
- return options instanceof SamzaPortablePipelineOptions;
+ // TODO: once Samza supports a better thread pool modle, e.g. thread
+ // per-task/key-range, this can be supported.
+ checkArgument(
+ jobConfig.getThreadPoolSize() <= 1,
+ JOB_CONTAINER_THREAD_POOL_SIZE
+ + " cannot be configured to"
+ + " greater than 1 for max bundle size: "
+ + pipelineOptions.getMaxBundleSize());
+ }
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
index 661c1a5..5a82606 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
@@ -33,4 +33,17 @@ public interface SamzaPortablePipelineOptions extends SamzaPipelineOptions {
int getControlPort();
void setControlPort(int port);
+
+ @Description("The expansion service port. (Default: 11442) ")
+ @Default.Integer(11442)
+ int getExpansionPort();
+
+ void setExpansionPort(int port);
+
+ @Description(
+ "The file path for the local file system token. If not set (by default), then the runner would"
+ + " not use secure server factory.")
+ String getFsTokenPath();
+
+ void setFsTokenPath(String path);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
index e31fea9..3546a1c 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
@@ -19,6 +19,10 @@ package org.apache.beam.runners.samza;
import java.time.Duration;
+// TODO: can we get rid of this class? Right now the SamzaPipelineOptionsValidator would force
+// the pipeline option to be the type SamzaPipelineOption. Ideally, we should be able to keep
+// passing SamzaPortablePipelineOption. Alternative, we could merge portable and non-portable
+// pipeline option.
/** A helper class for holding all the beam runner specific samza configs. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
@@ -33,6 +37,8 @@ public class SamzaRunnerOverrideConfigs {
public static final String CONTROL_CLIENT_MAX_WAIT_TIME_MS = "controL.wait.time.ms";
public static final long DEFAULT_CONTROL_CLIENT_MAX_WAIT_TIME_MS =
Duration.ofMinutes(2).toMillis();
+ public static final String FS_TOKEN_PATH = BEAM_RUNNER_CONFIG_PREFIX + "fs.token.path";
+ public static final String DEFAULT_FS_TOKEN_PATH = null;
private static boolean containsKey(SamzaPipelineOptions options, String configKey) {
if (options == null || options.getConfigOverride() == null) {
@@ -67,4 +73,13 @@ public class SamzaRunnerOverrideConfigs {
return DEFAULT_CONTROL_CLIENT_MAX_WAIT_TIME_MS;
}
}
+
+ /** Get fs token path for portable mode. */
+ public static String getFsTokenPath(SamzaPipelineOptions options) {
+ if (containsKey(options, FS_TOKEN_PATH)) {
+ return options.getConfigOverride().get(FS_TOKEN_PATH);
+ } else {
+ return DEFAULT_FS_TOKEN_PATH;
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index ef9531b..f94caae 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -372,7 +372,13 @@ public class UnboundedSourceSystem {
final Instant nextWatermark = reader.getWatermark();
if (currentWatermark.isBefore(nextWatermark)) {
currentWatermarks.put(ssp, nextWatermark);
- enqueueWatermark(reader);
+ if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+ enqueueWatermark(reader);
+ } else {
+ // Max watermark has been reached for this reader.
+ enqueueMaxWatermarkAndEndOfStream(reader);
+ running = false;
+ }
}
}
@@ -403,6 +409,37 @@ public class UnboundedSourceSystem {
queues.get(ssp).put(envelope);
}
+ // Send an max watermark message and an end of stream message to the corresponding ssp to
+ // close windows and finish the task.
+ private void enqueueMaxWatermarkAndEndOfStream(UnboundedReader<T> reader) {
+ final SystemStreamPartition ssp = readerToSsp.get(reader);
+ // Send the max watermark to force completion of any open windows.
+ final IncomingMessageEnvelope watermarkEnvelope =
+ IncomingMessageEnvelope.buildWatermarkEnvelope(
+ ssp, BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ enqueueUninterruptibly(watermarkEnvelope);
+
+ final IncomingMessageEnvelope endOfStreamEnvelope =
+ IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp);
+ enqueueUninterruptibly(endOfStreamEnvelope);
+ }
+
+ private void enqueueUninterruptibly(IncomingMessageEnvelope envelope) {
+ final BlockingQueue<IncomingMessageEnvelope> queue =
+ queues.get(envelope.getSystemStreamPartition());
+ while (true) {
+ try {
+ queue.put(envelope);
+ return;
+ } catch (InterruptedException e) {
+ // Some events require that we post an envelope to the queue even if the interrupt
+ // flag was set (i.e. during a call to stop) to ensure that the consumer properly
+ // shuts down. Consequently, if we receive an interrupt here we ignore it and retry
+ // the put operation.
+ }
+ }
+ }
+
void stop() {
running = false;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
index 60d7f69..6ca8b29 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
@@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory;
public class BeamContainerRunner implements ApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(BeamContainerRunner.class);
+ @SuppressWarnings("rawtypes")
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
+ @SuppressWarnings("rawtypes")
public BeamContainerRunner(SamzaApplication app, Config config) {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
}
@@ -56,9 +58,7 @@ public class BeamContainerRunner implements ApplicationRunner {
}));
ContainerLaunchUtil.run(
- appDesc,
- System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()),
- ContainerCfgFactory.jobModel);
+ appDesc, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID), ContainerCfgLoader.jobModel);
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java
new file mode 100644
index 0000000..fb00a01
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.container;
+
+import java.time.Duration;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.clustermanager.JobCoordinatorLaunchUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.runtime.ApplicationRunner;
+
+/** Runs on Yarn AM, execute planning and launches JobCoordinator. */
+public class BeamJobCoordinatorRunner implements ApplicationRunner {
+
+ @SuppressWarnings("rawtypes")
+ private final SamzaApplication<? extends ApplicationDescriptor> app;
+
+ private final Config config;
+
+ /**
+ * Constructors a {@link BeamJobCoordinatorRunner} to run the {@code app} with the {@code config}.
+ *
+ * @param app application to run
+ * @param config configuration for the application
+ */
+ @SuppressWarnings("rawtypes")
+ public BeamJobCoordinatorRunner(
+ SamzaApplication<? extends ApplicationDescriptor> app, Config config) {
+ this.app = app;
+ this.config = config;
+ }
+
+ @Override
+ public void run(ExternalContext externalContext) {
+ JobCoordinatorLaunchUtil.run(app, config);
+ }
+
+ @Override
+ public void kill() {
+ throw new UnsupportedOperationException(
+ "BeamJobCoordinatorRunner#kill should never be invoked.");
+ }
+
+ @Override
+ public ApplicationStatus status() {
+ throw new UnsupportedOperationException(
+ "BeamJobCoordinatorRunner#status should never be invoked.");
+ }
+
+ @Override
+ public void waitForFinish() {
+ throw new UnsupportedOperationException(
+ "BeamJobCoordinatorRunner#waitForFinish should never be invoked.");
+ }
+
+ @Override
+ public boolean waitForFinish(Duration timeout) {
+ throw new UnsupportedOperationException(
+ "BeamJobCoordinatorRunner#waitForFinish should never be invoked.");
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
similarity index 79%
rename from runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java
rename to runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
index cb97b58..99455e7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
@@ -17,12 +17,11 @@
*/
package org.apache.beam.runners.samza.container;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.ConfigLoader;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.SamzaContainer;
@@ -30,26 +29,27 @@ import org.apache.samza.job.model.JobModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Factory for the Beam yarn container to load job model. */
+/** Loader for the Beam yarn container to load job model. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class ContainerCfgFactory implements ConfigFactory {
- private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class);
+public class ContainerCfgLoader implements ConfigLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgLoader.class);
private static final Object LOCK = new Object();
static volatile JobModel jobModel;
@Override
- public Config getConfig(URI configUri) {
+ public Config getConfig() {
if (jobModel == null) {
synchronized (LOCK) {
if (jobModel == null) {
- String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+ final String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
LOG.info(String.format("Got container ID: %s", containerId));
- String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ final String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
LOG.info(String.format("Got coordinator URL: %s", coordinatorUrl));
- int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
+ final int delay =
+ new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
similarity index 65%
copy from runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
copy to runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
index d1e1f06..d3b090d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
@@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.samza.runtime;
+package org.apache.beam.runners.samza.container;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigLoader;
+import org.apache.samza.config.ConfigLoaderFactory;
-/** Output emitter for Samza {@link Op}. */
-public interface OpEmitter<OutT> {
- void emitElement(WindowedValue<OutT> element);
-
- void emitWatermark(Instant watermark);
-
- <T> void emitView(String id, WindowedValue<Iterable<T>> elements);
+/** Factory for the Beam yarn container to get loader to load job model. */
+public class ContainerCfgLoaderFactory implements ConfigLoaderFactory {
+ @Override
+ public ConfigLoader getLoader(Config config) {
+ return new ContainerCfgLoader();
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
new file mode 100644
index 0000000..e5bf8ec
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
+ * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
+ * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
+ * bundles have completed.
+ *
+ * <p>A bundle is considered complete only when the outputs corresponding to each element in the
+ * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
+ * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
+ * In case of synchronous ParDo, outputs of the element is resolved immediately after the
+ * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
+ * all the future emitted by the processElement is resolved.
+ *
+ * <p>This class is not thread safe and the current implementation relies on the assumption that
+ * messages are dispatched to BundleManager in a single threaded mode.
+ *
+ * @param <OutT> output type of the {@link DoFnOp}
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BundleManager<OutT> {
+ private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
+ private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
+
+ private final long maxBundleSize;
+ private final long maxBundleTimeMs;
+ private final BundleProgressListener<OutT> bundleProgressListener;
+ private final FutureCollector<OutT> futureCollector;
+ private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
+ private final String bundleCheckTimerId;
+
+ // Number elements belonging to the current active bundle
+ private transient AtomicLong currentBundleElementCount;
+ // Number of bundles that are in progress but not yet finished
+ private transient AtomicLong pendingBundleCount;
+ // Denotes the start time of the current active bundle
+ private transient AtomicLong bundleStartTime;
+ // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple
+ // bundle in progress.
+ // This flag denotes if there is a bundle that is current and hasn't been closed.
+ private transient AtomicBoolean isBundleStarted;
+ // Holder for watermark which gets propagated when the bundle is finished.
+ private transient Instant bundleWatermarkHold;
+ // A future that is completed once all futures belonging to the current active bundle are
+ // completed. The value is null if there are no futures in the current active bundle.
+ private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
+ private transient CompletionStage<Void> watermarkFuture;
+
+ public BundleManager(
+ BundleProgressListener<OutT> bundleProgressListener,
+ FutureCollector<OutT> futureCollector,
+ long maxBundleSize,
+ long maxBundleTimeMs,
+ Scheduler<KeyedTimerData<Void>> bundleTimerScheduler,
+ String bundleCheckTimerId) {
+ this.maxBundleSize = maxBundleSize;
+ this.maxBundleTimeMs = maxBundleTimeMs;
+ this.bundleProgressListener = bundleProgressListener;
+ this.bundleTimerScheduler = bundleTimerScheduler;
+ this.bundleCheckTimerId = bundleCheckTimerId;
+ this.futureCollector = futureCollector;
+
+ if (maxBundleSize > 1) {
+ scheduleNextBundleCheck();
+ }
+
+ // instance variable initialization for bundle tracking
+ this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
+ this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
+ this.currentBundleElementCount = new AtomicLong(0L);
+ this.isBundleStarted = new AtomicBoolean(false);
+ this.pendingBundleCount = new AtomicLong(0L);
+ this.watermarkFuture = CompletableFuture.completedFuture(null);
+ }
+
+ /*
+ * Schedule in processing time to check whether the current bundle should be closed. Note that
+ * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
+ * time set by users. This would violate the max bundle time by up to half of it but should
+ * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
+ */
+ private void scheduleNextBundleCheck() {
+ final Instant nextBundleCheckTime =
+ Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
+ final TimerInternals.TimerData timerData =
+ TimerInternals.TimerData.of(
+ this.bundleCheckTimerId,
+ StateNamespaces.global(),
+ nextBundleCheckTime,
+ nextBundleCheckTime,
+ TimeDomain.PROCESSING_TIME);
+ bundleTimerScheduler.schedule(
+ new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
+ }
+
+ void tryStartBundle() {
+ futureCollector.prepare();
+
+ if (isBundleStarted.compareAndSet(false, true)) {
+ LOG.debug("Starting a new bundle.");
+ // make sure the previous bundle is sealed and futures are cleared
+ Preconditions.checkArgument(
+ currentActiveBundleDoneFutureReference.get() == null,
+ "Current active bundle done future should be null before starting a new bundle.");
+ bundleStartTime.set(System.currentTimeMillis());
+ pendingBundleCount.incrementAndGet();
+ bundleProgressListener.onBundleStarted();
+ }
+
+ currentBundleElementCount.incrementAndGet();
+ }
+
+ void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+ // propagate watermark immediately if no bundle is in progress and all the previous bundles have
+ // completed.
+ if (!isBundleStarted() && pendingBundleCount.get() == 0) {
+ LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark);
+ bundleProgressListener.onWatermark(watermark, emitter);
+ return;
+ }
+
+ // hold back the watermark since there is either a bundle in progress or previously closed
+ // bundles are unfinished.
+ this.bundleWatermarkHold = watermark;
+
+ // for batch mode, the max watermark should force the bundle to close
+ if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+ /*
+ * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark
+ * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark.
+ * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures.
+ */
+ if (isBundleStarted()) {
+ LOG.info(
+ "Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
+ tryFinishBundle(emitter);
+ watermarkFuture.toCompletableFuture().join();
+ } else {
+ LOG.info(
+ "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
+ watermarkFuture.toCompletableFuture().join();
+ bundleProgressListener.onWatermark(watermark, emitter);
+ }
+ }
+ }
+
+ void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
+ // this is internal timer in processing time to check whether a bundle should be closed
+ if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
+ tryFinishBundle(emitter);
+ scheduleNextBundleCheck();
+ }
+ }
+
+ /**
+ * Signal the bundle manager to handle failure. We discard the output collected as part of
+ * processing the current element and reset the bundle count.
+ *
+ * @param t failure cause
+ */
+ void signalFailure(Throwable t) {
+ LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
+ futureCollector.discard();
+ // reset the bundle start flag only if the bundle has started
+ isBundleStarted.compareAndSet(true, false);
+
+ // bundle start may not necessarily mean we have actually started the bundle since some of the
+ // invariant check conditions within bundle start could throw exceptions. so rely on bundle
+ // start time
+ if (bundleStartTime.get() != Long.MAX_VALUE) {
+ currentBundleElementCount.set(0L);
+ bundleStartTime.set(Long.MAX_VALUE);
+ pendingBundleCount.decrementAndGet();
+ currentActiveBundleDoneFutureReference.set(null);
+ }
+ }
+
+ void tryFinishBundle(OpEmitter<OutT> emitter) {
+
+ // we need to seal the output for each element within a bundle irrespective of the whether we
+ // decide to finish the
+ // bundle or not
+ CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = futureCollector.finish();
+
+ if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
+ LOG.debug("Finishing the current bundle.");
+
+ // reset the bundle count
+ // seal the bundle and emit the result future (collection of results)
+ // chain the finish bundle invocation on the finish bundle
+ currentBundleElementCount.set(0L);
+ bundleStartTime.set(Long.MAX_VALUE);
+ Instant watermarkHold = bundleWatermarkHold;
+ bundleWatermarkHold = null;
+
+ CompletionStage<Void> currentActiveBundleDoneFuture =
+ currentActiveBundleDoneFutureReference.get();
+ outputFuture =
+ outputFuture.thenCombine(
+ currentActiveBundleDoneFuture != null
+ ? currentActiveBundleDoneFuture
+ : CompletableFuture.completedFuture(null),
+ (res, ignored) -> {
+ bundleProgressListener.onBundleFinished(emitter);
+ return res;
+ });
+
+ BiConsumer<Collection<WindowedValue<OutT>>, Void> watermarkPropagationFn;
+ if (watermarkHold == null) {
+ watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet();
+ } else {
+ watermarkPropagationFn =
+ (ignored, res) -> {
+ LOG.debug("Propagating watermark: {} to downstream.", watermarkHold);
+ bundleProgressListener.onWatermark(watermarkHold, emitter);
+ pendingBundleCount.decrementAndGet();
+ };
+ }
+
+ // We chain the current watermark emission with previous watermark and the output futures
+ // since bundles can finish out of order but we still want the watermark to be emitted in
+ // order.
+ watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn);
+ currentActiveBundleDoneFutureReference.set(null);
+ } else if (isBundleStarted.get()) {
+ final CompletableFuture<Collection<WindowedValue<OutT>>> finalOutputFuture =
+ outputFuture.toCompletableFuture();
+ currentActiveBundleDoneFutureReference.updateAndGet(
+ maybePrevFuture -> {
+ CompletableFuture<Void> prevFuture =
+ maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
+
+ return CompletableFuture.allOf(prevFuture, finalOutputFuture);
+ });
+ }
+
+ // emit the future to the propagate it to rest of the DAG
+ emitter.emitFuture(outputFuture);
+ }
+
+ @VisibleForTesting
+ long getCurrentBundleElementCount() {
+ return currentBundleElementCount.longValue();
+ }
+
+ @VisibleForTesting
+ @Nullable
+ CompletionStage<Void> getCurrentBundleDoneFuture() {
+ return currentActiveBundleDoneFutureReference.get();
+ }
+
+ @VisibleForTesting
+ void setCurrentBundleDoneFuture(CompletableFuture<Void> currentBundleResultFuture) {
+ this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
+ }
+
+ @VisibleForTesting
+ long getPendingBundleCount() {
+ return pendingBundleCount.longValue();
+ }
+
+ @VisibleForTesting
+ void setPendingBundleCount(long value) {
+ pendingBundleCount.set(value);
+ }
+
+ @VisibleForTesting
+ boolean isBundleStarted() {
+ return isBundleStarted.get();
+ }
+
+ @VisibleForTesting
+ void setBundleWatermarkHold(Instant watermark) {
+ this.bundleWatermarkHold = watermark;
+ }
+
+ /**
+ * We close the current bundle in progress if one of the following criteria is met 1. The bundle
+ * count ≥ maxBundleSize 2. Time elapsed since the bundle started is ≥ maxBundleTimeMs 3.
+ * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs
+ *
+ * @return true - if one of the criteria above is satisfied; false - otherwise
+ */
+ private boolean shouldFinishBundle() {
+ return isBundleStarted.get()
+ && (currentBundleElementCount.get() >= maxBundleSize
+ || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
+ || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
+ }
+
+ /**
+ * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
+ * consists of 1. Start bundle - Invoked when the bundle is started 2. Finish bundle - Invoked
+ * when the bundle is complete. Refer to the docs under {@link BundleManager} for definition on
+ * when a bundle is considered complete. 3. onWatermark - Invoked when watermark is ready to be
+ * propagated to downstream DAG. Refer to the docs under {@link BundleManager} on when watermark
+ * is held vs propagated.
+ *
+ * @param <OutT>
+ */
+ public interface BundleProgressListener<OutT> {
+ void onBundleStarted();
+
+ void onBundleFinished(OpEmitter<OutT> emitter);
+
+ void onWatermark(Instant watermark, OpEmitter<OutT> emitter);
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 9f2ea43..6565218 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -17,15 +17,19 @@
*/
package org.apache.beam.runners.samza.runtime;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
@@ -40,9 +44,9 @@ import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -60,7 +64,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterator
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +75,6 @@ import org.slf4j.LoggerFactory;
})
public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
- private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
private final TupleTag<FnOutT> mainOutputTag;
private final DoFn<InT, FnOutT> doFn;
@@ -113,17 +115,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
// TODO: add this to checkpointable state
private transient Instant inputWatermark;
- private transient Instant bundleWatermarkHold;
+ private transient BundleManager<OutT> bundleManager;
private transient Instant sideInputWatermark;
private transient List<WindowedValue<InT>> pushbackValues;
private transient StageBundleFactory stageBundleFactory;
- private transient long maxBundleSize;
- private transient long maxBundleTimeMs;
- private transient AtomicLong currentBundleElementCount;
- private transient AtomicLong bundleStartTime;
- private transient AtomicBoolean isBundleStarted;
- private transient Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
private DoFnSchemaInformation doFnSchemaInformation;
+ private transient boolean bundleDisabled;
private Map<String, PCollectionView<?>> sideInputMapping;
public DoFnOp(
@@ -178,26 +175,27 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
- this.currentBundleElementCount = new AtomicLong(0L);
- this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
- this.isBundleStarted = new AtomicBoolean(false);
- this.bundleWatermarkHold = null;
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final SamzaExecutionContext samzaExecutionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
this.samzaPipelineOptions = samzaExecutionContext.getPipelineOptions();
- this.maxBundleSize = samzaPipelineOptions.getMaxBundleSize();
- this.maxBundleTimeMs = samzaPipelineOptions.getMaxBundleTimeMs();
- this.bundleTimerScheduler = timerRegistry;
-
- if (this.maxBundleSize > 1) {
- scheduleNextBundleCheck();
- }
+ this.bundleDisabled = samzaPipelineOptions.getMaxBundleSize() <= 1;
+ final String stateId = "pardo-" + transformId;
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- transformId, null, context.getTaskContext(), samzaPipelineOptions, signature);
+ stateId, null, context.getTaskContext(), samzaPipelineOptions, signature);
+ final FutureCollector<OutT> outputFutureCollector = createFutureCollector();
+
+ this.bundleManager =
+ new BundleManager<>(
+ createBundleProgressListener(),
+ outputFutureCollector,
+ samzaPipelineOptions.getMaxBundleSize(),
+ samzaPipelineOptions.getMaxBundleTimeMs(),
+ timerRegistry,
+ bundleCheckTimerId);
this.timerInternalsFactory =
SamzaTimerInternalsFactory.createTimerInternalFactory(
@@ -224,7 +222,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
SamzaDoFnRunners.createPortable(
samzaPipelineOptions,
bundledEventsBagState,
- outputManagerFactory.create(emitter),
+ outputManagerFactory.create(emitter, outputFutureCollector),
stageBundleFactory,
mainOutputTag,
idToTupleTagMap,
@@ -237,13 +235,13 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
doFn,
windowingStrategy,
transformFullName,
- transformId,
+ stateId,
context,
mainOutputTag,
sideInputHandler,
timerInternalsFactory,
keyCoder,
- outputManagerFactory.create(emitter),
+ outputManagerFactory.create(emitter, outputFutureCollector),
inputCoder,
sideOutputTags,
outputCoders,
@@ -267,24 +265,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
doFnInvoker.invokeSetup();
}
- /*
- * Schedule in processing time to check whether the current bundle should be closed. Note that
- * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
- * time set by users. This would violate the max bundle time by up to half of it but should
- * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
- */
- private void scheduleNextBundleCheck() {
- final Instant nextBundleCheckTime =
- Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
- final TimerInternals.TimerData timerData =
- TimerInternals.TimerData.of(
- bundleCheckTimerId,
- StateNamespaces.global(),
- nextBundleCheckTime,
- nextBundleCheckTime,
- TimeDomain.PROCESSING_TIME);
- bundleTimerScheduler.schedule(
- new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
+ /*package private*/ FutureCollector<OutT> createFutureCollector() {
+ return new FutureCollectorImpl<>();
}
private String getTimerStateId(DoFnSignature signature) {
@@ -295,51 +277,25 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
return builder.toString();
}
- private void attemptStartBundle() {
- if (isBundleStarted.compareAndSet(false, true)) {
- currentBundleElementCount.set(0L);
- bundleStartTime.set(System.currentTimeMillis());
- pushbackFnRunner.startBundle();
- }
- }
-
- private void finishBundle(OpEmitter<OutT> emitter) {
- if (isBundleStarted.compareAndSet(true, false)) {
- currentBundleElementCount.set(0L);
- bundleStartTime.set(Long.MAX_VALUE);
- pushbackFnRunner.finishBundle();
- if (bundleWatermarkHold != null) {
- doProcessWatermark(bundleWatermarkHold, emitter);
- }
- bundleWatermarkHold = null;
- }
- }
-
- private void attemptFinishBundle(OpEmitter<OutT> emitter) {
- if (!isBundleStarted.get()) {
- return;
- }
- if (currentBundleElementCount.get() >= maxBundleSize
- || System.currentTimeMillis() - bundleStartTime.get() > maxBundleTimeMs) {
- finishBundle(emitter);
- }
- }
-
@Override
public void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter) {
- attemptStartBundle();
-
- final Iterable<WindowedValue<InT>> rejectedValues =
- pushbackFnRunner.processElementInReadyWindows(inputElement);
- for (WindowedValue<InT> rejectedValue : rejectedValues) {
- if (rejectedValue.getTimestamp().compareTo(pushbackWatermarkHold) < 0) {
- pushbackWatermarkHold = rejectedValue.getTimestamp();
+ try {
+ bundleManager.tryStartBundle();
+ final Iterable<WindowedValue<InT>> rejectedValues =
+ pushbackFnRunner.processElementInReadyWindows(inputElement);
+ for (WindowedValue<InT> rejectedValue : rejectedValues) {
+ if (rejectedValue.getTimestamp().compareTo(pushbackWatermarkHold) < 0) {
+ pushbackWatermarkHold = rejectedValue.getTimestamp();
+ }
+ pushbackValues.add(rejectedValue);
}
- pushbackValues.add(rejectedValue);
- }
- currentBundleElementCount.incrementAndGet();
- attemptFinishBundle(emitter);
+ bundleManager.tryFinishBundle(emitter);
+ } catch (Throwable t) {
+ LOG.error("Encountered error during process element", t);
+ bundleManager.signalFailure(t);
+ throw t;
+ }
}
private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {
@@ -373,21 +329,14 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
@Override
public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
- if (!isBundleStarted.get()) {
- doProcessWatermark(watermark, emitter);
- } else {
- // if there is a bundle in progress, hold back the watermark until end of the bundle
- this.bundleWatermarkHold = watermark;
- if (watermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- // for batch mode, the max watermark should force the bundle to close
- finishBundle(emitter);
- }
- }
+ bundleManager.processWatermark(watermark, emitter);
}
@Override
public void processSideInput(
String id, WindowedValue<? extends Iterable<?>> elements, OpEmitter<OutT> emitter) {
+ checkState(
+ bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
@SuppressWarnings("unchecked")
final WindowedValue<Iterable<?>> retypedElements = (WindowedValue<Iterable<?>>) elements;
@@ -413,6 +362,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
@Override
public void processSideInputWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+ checkState(
+ bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
sideInputWatermark = watermark;
if (sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
@@ -425,8 +376,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
// this is internal timer in processing time to check whether a bundle should be closed
if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
- attemptFinishBundle(emitter);
- scheduleNextBundleCheck();
+ bundleManager.processTimer(keyedTimerData, emitter);
return;
}
@@ -439,7 +389,6 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
@Override
public void close() {
- bundleWatermarkHold = null;
doFnInvoker.invokeTeardown();
try (AutoCloseable closer = stageBundleFactory) {
// do nothing
@@ -471,6 +420,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
}
}
+ // todo: should this go through bundle manager to start and finish the bundle?
private void emitAllPushbackValues() {
if (!pushbackValues.isEmpty()) {
pushbackFnRunner.startBundle();
@@ -487,6 +437,88 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
}
}
+ private BundleManager.BundleProgressListener<OutT> createBundleProgressListener() {
+ return new BundleManager.BundleProgressListener<OutT>() {
+ @Override
+ public void onBundleStarted() {
+ pushbackFnRunner.startBundle();
+ }
+
+ @Override
+ public void onBundleFinished(OpEmitter<OutT> emitter) {
+ pushbackFnRunner.finishBundle();
+ }
+
+ @Override
+ public void onWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+ doProcessWatermark(watermark, emitter);
+ }
+ };
+ }
+
+ static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(
+ WindowedValue<T> windowedValue,
+ CompletionStage<T> valueFuture,
+ Function<T, OutT> valueMapper) {
+ return valueFuture.thenApply(
+ res ->
+ WindowedValue.of(
+ valueMapper.apply(res),
+ windowedValue.getTimestamp(),
+ windowedValue.getWindows(),
+ windowedValue.getPane()));
+ }
+
+ static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
+ private final List<CompletionStage<WindowedValue<OutT>>> outputFutures;
+ private AtomicBoolean collectorSealed;
+
+ FutureCollectorImpl() {
+ /*
+ * Choosing synchronized list here since the concurrency is low as the message dispatch thread is single threaded.
+ * We need this guard against scenarios when watermark/finish bundle trigger outputs.
+ */
+ outputFutures = Collections.synchronizedList(new ArrayList<>());
+ collectorSealed = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void add(CompletionStage<WindowedValue<OutT>> element) {
+ checkState(
+ !collectorSealed.get(),
+ "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");
+ outputFutures.add(element);
+ }
+
+ @Override
+ public void discard() {
+ collectorSealed.compareAndSet(false, true);
+ outputFutures.clear();
+ }
+
+ @Override
+ public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
+ /*
+ * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
+ * and an empty collection will be returned.
+ */
+ collectorSealed.compareAndSet(false, true);
+
+ CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture =
+ FutureUtils.flattenFutures(outputFutures);
+ outputFutures.clear();
+ return sealedOutputFuture;
+ }
+
+ @Override
+ public void prepare() {
+ boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
+ checkState(
+ isCollectorSealed,
+ "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
+ }
+ }
+
/**
* Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
* emits values to the main output only, which is a single {@link
@@ -497,13 +529,31 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
public static class SingleOutputManagerFactory<OutT> implements OutputManagerFactory<OutT> {
@Override
public DoFnRunners.OutputManager create(OpEmitter<OutT> emitter) {
+ return createOutputManager(emitter, null);
+ }
+
+ @Override
+ public DoFnRunners.OutputManager create(
+ OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
+ return createOutputManager(emitter, collector);
+ }
+
+ private DoFnRunners.OutputManager createOutputManager(
+ OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
return new DoFnRunners.OutputManager() {
@Override
+ @SuppressWarnings("unchecked")
public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
// With only one input we know that T is of type OutT.
- @SuppressWarnings("unchecked")
- final WindowedValue<OutT> retypedWindowedValue = (WindowedValue<OutT>) windowedValue;
- emitter.emitElement(retypedWindowedValue);
+ if (windowedValue.getValue() instanceof CompletionStage) {
+ CompletionStage<T> valueFuture = (CompletionStage<T>) windowedValue.getValue();
+ if (collector != null) {
+ collector.add(createOutputFuture(windowedValue, valueFuture, value -> (OutT) value));
+ }
+ } else {
+ final WindowedValue<OutT> retypedWindowedValue = (WindowedValue<OutT>) windowedValue;
+ emitter.emitElement(retypedWindowedValue);
+ }
}
};
}
@@ -523,13 +573,34 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
@Override
public DoFnRunners.OutputManager create(OpEmitter<RawUnionValue> emitter) {
+ return createOutputManager(emitter, null);
+ }
+
+ @Override
+ public DoFnRunners.OutputManager create(
+ OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
+ return createOutputManager(emitter, collector);
+ }
+
+ private DoFnRunners.OutputManager createOutputManager(
+ OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
return new DoFnRunners.OutputManager() {
@Override
+ @SuppressWarnings("unchecked")
public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
final int index = tagToIndexMap.get(tupleTag);
final T rawValue = windowedValue.getValue();
- final RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
- emitter.emitElement(windowedValue.withValue(rawUnionValue));
+ if (rawValue instanceof CompletionStage) {
+ CompletionStage<T> valueFuture = (CompletionStage<T>) rawValue;
+ if (collector != null) {
+ collector.add(
+ createOutputFuture(
+ windowedValue, valueFuture, res -> new RawUnionValue(index, res)));
+ }
+ } else {
+ final RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
+ emitter.emitElement(windowedValue.withValue(rawUnionValue));
+ }
}
};
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java
new file mode 100644
index 0000000..acb2eba
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A future collector that buffers the output from the users {@link
+ * org.apache.beam.sdk.transforms.DoFn} and propagates the result future to downstream operators
+ * only after {@link #finish()} is invoked.
+ *
+ * @param <OutT> type of the output element
+ */
+public interface FutureCollector<OutT> {
+ /**
+ * Outputs the element to the collector.
+ *
+ * @param element to add to the collector
+ */
+ void add(CompletionStage<WindowedValue<OutT>> element);
+
+ /**
+ * Discards the elements within the collector. Once the elements have been discarded, callers need
+ * to prepare the collector again before invoking {@link #add(CompletionStage)}.
+ */
+ void discard();
+
+ /**
+ * Seals this {@link FutureCollector}, returning a {@link CompletionStage} containing all of the
+ * elements that were added to it. The {@link #add(CompletionStage)} method will throw an {@link
+ * IllegalStateException} if called after a call to finish.
+ *
+ * <p>The {@link FutureCollector} needs to be started again to collect newer batch of output.
+ */
+ CompletionStage<Collection<WindowedValue<OutT>>> finish();
+
+ /**
+ * Prepares the {@link FutureCollector} to accept output elements. The {@link
+ * #add(CompletionStage)} method will throw an {@link IllegalStateException} if called without
+ * preparing the collector.
+ */
+ void prepare();
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index a84dde6..c4022f9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -32,8 +32,6 @@ import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
@@ -111,11 +109,10 @@ public class GroupByKeyOp<K, InputT, OutputT>
Context context,
Scheduler<KeyedTimerData<K>> timerRegistry,
OpEmitter<KV<K, OutputT>> emitter) {
- this.pipelineOptions =
- Base64Serializer.deserializeUnchecked(
- config.get("beamPipelineOptions"), SerializablePipelineOptions.class)
- .get()
- .as(SamzaPipelineOptions.class);
+
+ final SamzaExecutionContext samzaExecutionContext =
+ (SamzaExecutionContext) context.getApplicationContainerContext();
+ this.pipelineOptions = samzaExecutionContext.getPipelineOptions();
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 9dcec3d..9c6bf38 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -42,6 +42,7 @@ import org.joda.time.Instant;
* {@link Comparable} by first comparing the wrapped TimerData then the key.
*/
@SuppressWarnings({
+ "keyfor",
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 564ee46..46746a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,24 +21,30 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
-import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Adaptor class that runs a Samza {@link Op} for BEAM in the Samza {@link FlatMapFunction}. */
+/**
+ * Adaptor class that runs a Samza {@link Op} for BEAM in the Samza {@link AsyncFlatMapFunction}.
+ * This class is initialized once for each Op within a Task for each Task.
+ */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class OpAdapter<InT, OutT, K>
- implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
+ implements AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
@@ -46,12 +52,13 @@ public class OpAdapter<InT, OutT, K>
private final Op<InT, OutT, K> op;
private transient List<OpMessage<OutT>> outputList;
+ private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
private transient Instant outputWatermark;
private transient OpEmitter<OutT> emitter;
private transient Config config;
private transient Context context;
- public static <InT, OutT, K> FlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
+ public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
Op<InT, OutT, K> op) {
return new OpAdapter<>(op);
}
@@ -76,7 +83,7 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public Collection<OpMessage<OutT>> apply(OpMessage<InT> message) {
+ public synchronized CompletionStage<Collection<OpMessage<OutT>>> apply(OpMessage<InT> message) {
assert outputList.isEmpty();
try {
@@ -99,13 +106,26 @@ public class OpAdapter<InT, OutT, K>
throw UserCodeException.wrap(e);
}
- final List<OpMessage<OutT>> results = new ArrayList<>(outputList);
+ CompletionStage<Collection<OpMessage<OutT>>> resultFuture =
+ CompletableFuture.completedFuture(new ArrayList<>(outputList));
+
+ if (outputFuture != null) {
+ resultFuture =
+ resultFuture.thenCombine(
+ outputFuture,
+ (res1, res2) -> {
+ res1.addAll(res2);
+ return res1;
+ });
+ }
+
outputList.clear();
- return results;
+ outputFuture = null;
+ return resultFuture;
}
@Override
- public Collection<OpMessage<OutT>> processWatermark(long time) {
+ public synchronized Collection<OpMessage<OutT>> processWatermark(long time) {
assert outputList.isEmpty();
try {
@@ -122,12 +142,13 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public Long getOutputWatermark() {
+ public synchronized Long getOutputWatermark() {
return outputWatermark != null ? outputWatermark.getMillis() : null;
}
@Override
- public Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
+ public synchronized Collection<OpMessage<OutT>> onCallback(
+ KeyedTimerData<K> keyedTimerData, long time) {
assert outputList.isEmpty();
try {
@@ -154,6 +175,13 @@ public class OpAdapter<InT, OutT, K>
}
@Override
+ public void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> resultFuture) {
+ outputFuture =
+ resultFuture.thenApply(
+ res -> res.stream().map(OpMessage::ofElement).collect(Collectors.toList()));
+ }
+
+ @Override
public void emitWatermark(Instant watermark) {
outputWatermark = watermark;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
index d1e1f06..951f5df 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.runners.samza.runtime;
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;
/** Output emitter for Samza {@link Op}. */
public interface OpEmitter<OutT> {
+
+ void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> resultFuture);
+
void emitElement(WindowedValue<OutT> element);
void emitWatermark(Instant watermark);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
index e404c5f..5d4047d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
@@ -23,4 +23,9 @@ import org.apache.beam.runners.core.DoFnRunners;
/** Factory class to create {@link DoFnRunners.OutputManager}. */
public interface OutputManagerFactory<OutT> extends Serializable {
DoFnRunners.OutputManager create(OpEmitter<OutT> emitter);
+
+ default DoFnRunners.OutputManager create(
+ OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
+ return create(emitter);
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index a388c25..cd30d22 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -89,14 +89,14 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
new ThreadLocal<>();
// the stores include both beamStore for system states as well as stores for user state
- private final Map<String, KeyValueStore<ByteArray, byte[]>> stores;
+ private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
- Map<String, KeyValueStore<ByteArray, byte[]>> stores,
+ Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
@Nullable K key,
byte @Nullable [] keyBytes,
String stageId,
@@ -109,21 +109,23 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
@SuppressWarnings("unchecked")
- static KeyValueStore<ByteArray, byte[]> getBeamStore(TaskContext context) {
- return (KeyValueStore<ByteArray, byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
+ static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context) {
+ return (KeyValueStore<ByteArray, StateValue<?>>)
+ context.getStore(SamzaStoreStateInternals.BEAM_STORE);
}
- static Factory createStateInternalFactory(
+ @SuppressWarnings("unchecked")
+ static <K> Factory<K> createStateInternalFactory(
String id,
- Coder<?> keyCoder,
+ Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
DoFnSignature signature) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
- final Map<String, KeyValueStore<ByteArray, byte[]>> stores = new HashMap<>();
+ final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
- final Coder stateKeyCoder;
+ final Coder<K> stateKeyCoder;
if (keyCoder != null) {
signature
.stateDeclarations()
@@ -131,10 +133,11 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
.forEach(
stateId ->
stores.put(
- stateId, (KeyValueStore<ByteArray, byte[]>) context.getStore(stateId)));
+ stateId,
+ (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
stateKeyCoder = keyCoder;
} else {
- stateKeyCoder = VoidCoder.of();
+ stateKeyCoder = (Coder<K>) VoidCoder.of();
}
return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
}
@@ -227,13 +230,13 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
/** Factory class to create {@link SamzaStoreStateInternals}. */
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
- private final Map<String, KeyValueStore<ByteArray, byte[]>> stores;
+ private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
- Map<String, KeyValueStore<ByteArray, byte[]>> stores,
+ Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
Coder<K> keyCoder,
int batchGetSize) {
this.stageId = stageId;
@@ -270,34 +273,28 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
private abstract class AbstractSamzaState<T> {
- private final Coder<T> coder;
- private final byte[] encodedStoreKey;
- private final String namespace;
- protected final KeyValueStore<ByteArray, byte[]> store;
+ private final StateNamespace namespace;
+ private final String addressId;
+ private final boolean isBeamStore;
+ private final String stageId;
+ private final byte[] keyBytes;
+ private byte[] encodedStoreKey;
+ protected final Coder<T> coder;
+ protected final KeyValueStore<ByteArray, StateValue<T>> store;
+ @SuppressWarnings({"unchecked", "rawtypes"})
protected AbstractSamzaState(
StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
this.coder = coder;
- this.namespace = namespace.stringKey();
-
- final KeyValueStore<ByteArray, byte[]> userStore = stores.get(address.getId());
- this.store = userStore != null ? userStore : stores.get(BEAM_STORE);
-
- final ByteArrayOutputStream baos = getThreadLocalBaos();
- try (DataOutputStream dos = new DataOutputStream(baos)) {
- dos.write(keyBytes);
- dos.writeUTF(namespace.stringKey());
-
- if (userStore == null) {
- // for system state, we need to differentiate based on the following:
- dos.writeUTF(stageId);
- dos.writeUTF(address.getId());
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Could not encode full address for state: " + address.getId(), e);
- }
- this.encodedStoreKey = baos.toByteArray();
+ this.namespace = namespace;
+ this.addressId = address.getId();
+ this.isBeamStore = !stores.containsKey(address.getId());
+ this.store =
+ isBeamStore
+ ? (KeyValueStore) stores.get(BEAM_STORE)
+ : (KeyValueStore) stores.get(address.getId());
+ this.stageId = SamzaStoreStateInternals.this.stageId;
+ this.keyBytes = SamzaStoreStateInternals.this.keyBytes;
}
protected void clearInternal() {
@@ -305,12 +302,12 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
protected void writeInternal(T value) {
- store.put(getEncodedStoreKey(), encodeValue(value));
+ store.put(getEncodedStoreKey(), StateValue.of(value, coder));
}
protected T readInternal() {
- final byte[] valueBytes = store.get(getEncodedStoreKey());
- return decodeValue(valueBytes);
+ final StateValue<T> stateValue = store.get(getEncodedStoreKey());
+ return decodeValue(stateValue);
}
protected ReadableState<Boolean> isEmptyInternal() {
@@ -328,32 +325,31 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
protected ByteArray getEncodedStoreKey() {
- return ByteArray.of(encodedStoreKey);
+ return ByteArray.of(getEncodedStoreKeyBytes());
}
protected byte[] getEncodedStoreKeyBytes() {
- return encodedStoreKey;
- }
-
- protected byte[] encodeValue(T value) {
- final ByteArrayOutputStream baos = getThreadLocalBaos();
- try {
- coder.encode(value, baos);
- } catch (IOException e) {
- throw new RuntimeException("Could not encode state value: " + value, e);
- }
- return baos.toByteArray();
- }
-
- protected T decodeValue(byte[] valueBytes) {
- if (valueBytes != null) {
- try {
- return coder.decode(new ByteArrayInputStream(valueBytes));
+ if (encodedStoreKey == null) {
+ final ByteArrayOutputStream baos = getThreadLocalBaos();
+ try (DataOutputStream dos = new DataOutputStream(baos)) {
+ dos.write(keyBytes);
+ dos.writeUTF(namespace.stringKey());
+
+ if (isBeamStore) {
+ // for system state, we need to differentiate based on the following:
+ dos.writeUTF(stageId);
+ dos.writeUTF(addressId);
+ }
} catch (IOException e) {
- throw new RuntimeException("Could not decode state", e);
+ throw new RuntimeException("Could not encode full address for state: " + addressId, e);
}
+ this.encodedStoreKey = baos.toByteArray();
}
- return null;
+ return encodedStoreKey;
+ }
+
+ protected T decodeValue(StateValue<T> stateValue) {
+ return stateValue == null ? null : stateValue.getValue(coder);
}
@Override
@@ -367,13 +363,20 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@SuppressWarnings("unchecked")
final AbstractSamzaState<?> that = (AbstractSamzaState<?>) o;
- return Arrays.equals(encodedStoreKey, that.encodedStoreKey);
+ if (isBeamStore || that.isBeamStore) {
+ if (!isBeamStore || !that.isBeamStore || !stageId.equals(that.stageId)) {
+ return false;
+ }
+ }
+ return Arrays.equals(keyBytes, that.keyBytes)
+ && addressId.equals(that.addressId)
+ && this.namespace.equals(that.namespace);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + Arrays.hashCode(encodedStoreKey);
+ result = 31 * result + Arrays.hashCode(getEncodedStoreKeyBytes());
return result;
}
}
@@ -417,8 +420,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
synchronized (store) {
final int size = getSize();
final ByteArray encodedKey = encodeKey(size);
- store.put(encodedKey, encodeValue(value));
- store.put(getEncodedStoreKey(), Ints.toByteArray(size + 1));
+ store.put(encodedKey, StateValue.of(value, coder));
+ store.put(getEncodedStoreKey(), StateValue.of(Ints.toByteArray(size + 1)));
}
}
@@ -476,8 +479,10 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
private int getSize() {
- final byte[] sizeBytes = store.get(getEncodedStoreKey());
- return sizeBytes == null ? 0 : Ints.fromByteArray(sizeBytes);
+ final StateValue stateSize = store.get(getEncodedStoreKey());
+ return (stateSize == null || stateSize.valueBytes == null)
+ ? 0
+ : Ints.fromByteArray(stateSize.valueBytes);
}
private ByteArray encodeKey(int size) {
@@ -589,7 +594,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
private final Coder<KeyT> keyCoder;
private final int storeKeySize;
- private final List<KeyValueIterator<ByteArray, byte[]>> openIterators =
+ private final List<KeyValueIterator<ByteArray, StateValue<ValueT>>> openIterators =
Collections.synchronizedList(new ArrayList<>());
private int maxKeySize;
@@ -611,7 +616,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
public void put(KeyT key, ValueT value) {
final ByteArray encodedKey = encodeKey(key);
maxKeySize = Math.max(maxKeySize, encodedKey.getValue().length);
- store.put(encodedKey, encodeValue(value));
+ store.put(encodedKey, StateValue.of(value, coder));
}
@Override
@@ -687,7 +692,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@Override
public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
final ByteArray maxKey = createMaxKey();
- final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
+ final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+ store.range(getEncodedStoreKey(), maxKey);
openIterators.add(kvIter);
return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>() {
@@ -707,7 +713,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@Override
public Map.Entry<KeyT, ValueT> next() {
- Entry<ByteArray, byte[]> entry = kvIter.next();
+ Entry<ByteArray, StateValue<ValueT>> entry = kvIter.next();
return new AbstractMap.SimpleEntry<>(
decodeKey(entry.getKey()), decodeValue(entry.getValue()));
}
@@ -726,16 +732,19 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
* properly, we need to load the content into memory.
*/
private <OutputT> Iterable<OutputT> createIterable(
- SerializableFunction<org.apache.samza.storage.kv.Entry<ByteArray, byte[]>, OutputT> fn) {
+ SerializableFunction<
+ org.apache.samza.storage.kv.Entry<ByteArray, StateValue<ValueT>>, OutputT>
+ fn) {
final ByteArray maxKey = createMaxKey();
- final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
- final List<Entry<ByteArray, byte[]>> iterable = ImmutableList.copyOf(kvIter);
+ final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+ store.range(getEncodedStoreKey(), maxKey);
+ final List<Entry<ByteArray, StateValue<ValueT>>> iterable = ImmutableList.copyOf(kvIter);
kvIter.close();
return new Iterable<OutputT>() {
@Override
public Iterator<OutputT> iterator() {
- final Iterator<Entry<ByteArray, byte[]>> iter = iterable.iterator();
+ final Iterator<Entry<ByteArray, StateValue<ValueT>>> iter = iterable.iterator();
return new Iterator<OutputT>() {
@Override
@@ -755,7 +764,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@Override
public void clear() {
final ByteArray maxKey = createMaxKey();
- final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
+ final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+ store.range(getEncodedStoreKey(), maxKey);
while (kvIter.hasNext()) {
store.delete(kvIter.next().getKey());
}
@@ -975,4 +985,76 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
}
}
+
+ /**
+ * Wrapper for state value so that unencoded value can be read directly from the cache of
+ * KeyValueStore.
+ */
+ public static class StateValue<T> implements Serializable {
+ private T value;
+ private Coder<T> valueCoder;
+ private byte[] valueBytes;
+
+ private StateValue(T value, Coder<T> valueCoder, byte[] valueBytes) {
+ this.value = value;
+ this.valueCoder = valueCoder;
+ this.valueBytes = valueBytes;
+ }
+
+ public static <T> StateValue<T> of(T value, Coder<T> valueCoder) {
+ return new StateValue<>(value, valueCoder, null);
+ }
+
+ public static <T> StateValue<T> of(byte[] valueBytes) {
+ return new StateValue<>(null, null, valueBytes);
+ }
+
+ public T getValue(Coder<T> coder) {
+ if (value == null && valueBytes != null) {
+ if (valueCoder == null) {
+ valueCoder = coder;
+ }
+ try {
+ value = valueCoder.decode(new ByteArrayInputStream(valueBytes));
+ } catch (IOException e) {
+ throw new RuntimeException("Could not decode state", e);
+ }
+ }
+ return value;
+ }
+
+ public byte[] getValueBytes() {
+ if (valueBytes == null && value != null) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ valueCoder.encode(value, baos);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not encode state value: " + value, e);
+ }
+ valueBytes = baos.toByteArray();
+ }
+ return valueBytes;
+ }
+ }
+
+ /** Factory class to provide {@link StateValueSerdeFactory.StateValueSerde}. */
+ public static class StateValueSerdeFactory implements SerdeFactory<StateValue<?>> {
+ @Override
+ public Serde<StateValue<?>> getSerde(String name, Config config) {
+ return new StateValueSerde();
+ }
+
+ /** Serde for {@link StateValue}. */
+ public static class StateValueSerde implements Serde<StateValue<?>> {
+ @Override
+ public StateValue<?> fromBytes(byte[] bytes) {
+ return StateValue.of(bytes);
+ }
+
+ @Override
+ public byte[] toBytes(StateValue<?> stateValue) {
+ return stateValue == null ? null : stateValue.getValueBytes();
+ }
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 1fc0324..ad426f2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.state.SamzaMapState;
+import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -64,8 +65,7 @@ import org.slf4j.LoggerFactory;
})
public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
-
- private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
+ private final NavigableSet<KeyedTimerData<K>> eventTimeBuffer;
private final Coder<K> keyCoder;
private final Scheduler<KeyedTimerData<K>> timerRegistry;
private final SamzaTimerState state;
@@ -74,16 +74,31 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ // Size of each event timer is around 200B, by default with buffer size 50k, the default size is
+ // 10M
+ private final int maxEventTimerBufferSize;
+ // Max event time stored in eventTimerBuffer
+ // If it is set to long.MAX_VALUE, it indicates the State does not contain any KeyedTimerData
+ private long maxEventTimeInBuffer;
+
+ // The maximum number of ready timers to process at once per watermark.
+ private final long maxReadyTimersToProcessOnce;
+
private SamzaTimerInternalsFactory(
Coder<K> keyCoder,
Scheduler<KeyedTimerData<K>> timerRegistry,
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
Coder<BoundedWindow> windowCoder,
- IsBounded isBounded) {
+ IsBounded isBounded,
+ SamzaPipelineOptions pipelineOptions) {
this.keyCoder = keyCoder;
this.timerRegistry = timerRegistry;
- this.eventTimeTimers = new TreeSet<>();
+ this.eventTimeBuffer = new TreeSet<>();
+ this.maxEventTimerBufferSize =
+ pipelineOptions.getEventTimerBufferSize(); // must be placed before state initialization
+ this.maxEventTimeInBuffer = Long.MAX_VALUE;
+ this.maxReadyTimersToProcessOnce = pipelineOptions.getMaxReadyTimersToProcessOnce();
this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
this.isBounded = isBounded;
}
@@ -105,7 +120,8 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
timerStateId,
nonKeyedStateInternalsFactory,
windowCoder,
- isBounded);
+ isBounded,
+ pipelineOptions);
}
@Override
@@ -152,16 +168,37 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
outputWatermark = watermark;
}
+ /**
+ * The method is called when watermark comes. It compares timers in memory buffer with watermark
+ * to prepare ready timers. When memory buffer is empty, it asks store to reload timers into
+ * buffer. note that the number of timers returned may be larger than memory buffer size.
+ *
+ * @return a collection of ready timers to be fired
+ */
public Collection<KeyedTimerData<K>> removeReadyTimers() {
final Collection<KeyedTimerData<K>> readyTimers = new ArrayList<>();
- while (!eventTimeTimers.isEmpty()
- && eventTimeTimers.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
- final KeyedTimerData<K> keyedTimerData = eventTimeTimers.pollFirst();
+ while (!eventTimeBuffer.isEmpty()
+ && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)
+ && readyTimers.size() < maxReadyTimersToProcessOnce) {
+
+ final KeyedTimerData<K> keyedTimerData = eventTimeBuffer.pollFirst();
readyTimers.add(keyedTimerData);
state.deletePersisted(keyedTimerData);
+
+ if (eventTimeBuffer.isEmpty()) {
+ state.reloadEventTimeTimers();
+ }
}
+ LOG.debug("Removed {} ready timers", readyTimers.size());
+ if (readyTimers.size() == maxReadyTimersToProcessOnce
+ && !eventTimeBuffer.isEmpty()
+ && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
+ LOG.warn(
+ "Loaded {} expired timers, the remaining will be processed at next watermark.",
+ maxReadyTimersToProcessOnce);
+ }
return readyTimers;
}
@@ -177,6 +214,11 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
return outputWatermark;
}
+ // for unit test only
+ NavigableSet<KeyedTimerData<K>> getEventTimeBuffer() {
+ return eventTimeBuffer;
+ }
+
private class SamzaTimerInternals implements TimerInternals {
private final byte[] keyBytes;
private final K key;
@@ -202,13 +244,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
public void setTimer(TimerData timerData) {
if (isBounded == IsBounded.UNBOUNDED
&& timerData.getTimestamp().getMillis()
- >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
- // No need to register a timer of max timestamp if the input is unbounded
+ > GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
+ // No need to register a timer greater than maxTimestamp if the input is unbounded.
+ // 1. It will ignore timers with (maxTimestamp + 1) created by stateful ParDo with global
+ // window.
+ // 2. It will register timers with maxTimestamp so that global window can be closed
+ // correctly when max watermark comes.
return;
}
final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
- if (eventTimeTimers.contains(keyedTimerData)) {
+ if (eventTimeBuffer.contains(keyedTimerData)) {
return;
}
@@ -230,9 +276,32 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
// persist it first
state.persist(keyedTimerData);
+ // TO-DO: apply the same memory optimization over processing timers
switch (timerData.getDomain()) {
case EVENT_TIME:
- eventTimeTimers.add(keyedTimerData);
+ /**
+ * To determine if the upcoming KeyedTimerData could be added to the Buffer while
+ * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
+ * timestamp eviction priority:
+ *
+ * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is
+ * empty, therefore all the Event times greater or lesser than newTimestamp are in the
+ * buffer;
+ *
+ * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
+ * greater than newTimestamp, so it is safe to add it to the buffer
+ *
+ * <p>In case that the Buffer is full, we remove the largest timer from memory according
+ * to {@link KeyedTimerData.compareTo()}
+ */
+ if (newTimestamp < maxEventTimeInBuffer) {
+ eventTimeBuffer.add(keyedTimerData);
+ if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
+ eventTimeBuffer.pollLast();
+ maxEventTimeInBuffer =
+ eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
+ }
+ }
break;
case PROCESSING_TIME:
@@ -272,7 +341,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
switch (timerData.getDomain()) {
case EVENT_TIME:
- eventTimeTimers.remove(keyedTimerData);
+ eventTimeBuffer.remove(keyedTimerData);
break;
case PROCESSING_TIME:
@@ -281,7 +350,8 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
default:
throw new UnsupportedOperationException(
- String.format("%s currently only supports event time", SamzaRunner.class));
+ String.format(
+ "%s currently only supports event time or processing time", SamzaRunner.class));
}
}
@@ -309,15 +379,16 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
}
private class SamzaTimerState {
- private final SamzaMapState<TimerKey<K>, Long> eventTimerTimerState;
- private final SamzaMapState<TimerKey<K>, Long> processingTimerTimerState;
+ private final SamzaMapState<TimerKey<K>, Long> eventTimeTimerState;
+ private final SamzaSetState<KeyedTimerData<K>> timestampSortedEventTimeTimerState;
+ private final SamzaMapState<TimerKey<K>, Long> processingTimeTimerState;
SamzaTimerState(
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
Coder<BoundedWindow> windowCoder) {
- this.eventTimerTimerState =
+ this.eventTimeTimerState =
(SamzaMapState<TimerKey<K>, Long>)
nonKeyedStateInternalsFactory
.stateInternalsForKey(null)
@@ -328,7 +399,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
new TimerKeyCoder<>(keyCoder, windowCoder),
VarLongCoder.of()));
- this.processingTimerTimerState =
+ this.timestampSortedEventTimeTimerState =
+ (SamzaSetState<KeyedTimerData<K>>)
+ nonKeyedStateInternalsFactory
+ .stateInternalsForKey(null)
+ .state(
+ StateNamespaces.global(),
+ StateTags.set(
+ timerStateId + "-ts",
+ new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+ this.processingTimeTimerState =
(SamzaMapState<TimerKey<K>, Long>)
nonKeyedStateInternalsFactory
.stateInternalsForKey(null)
@@ -339,17 +420,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
new TimerKeyCoder<>(keyCoder, windowCoder),
VarLongCoder.of()));
- restore();
+ init();
}
Long get(KeyedTimerData<K> keyedTimerData) {
final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
switch (keyedTimerData.getTimerData().getDomain()) {
case EVENT_TIME:
- return eventTimerTimerState.get(timerKey).read();
+ return eventTimeTimerState.get(timerKey).read();
case PROCESSING_TIME:
- return processingTimerTimerState.get(timerKey).read();
+ return processingTimeTimerState.get(timerKey).read();
default:
throw new UnsupportedOperationException(
@@ -361,18 +442,29 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
switch (keyedTimerData.getTimerData().getDomain()) {
case EVENT_TIME:
- eventTimerTimerState.put(
+ final Long timestamp = eventTimeTimerState.get(timerKey).read();
+
+ if (timestamp != null) {
+ final KeyedTimerData keyedTimerDataInStore =
+ TimerKey.toKeyedTimerData(timerKey, timestamp, TimeDomain.EVENT_TIME, keyCoder);
+ timestampSortedEventTimeTimerState.remove(keyedTimerDataInStore);
+ }
+ eventTimeTimerState.put(
timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
+
+ timestampSortedEventTimeTimerState.add(keyedTimerData);
+
break;
case PROCESSING_TIME:
- processingTimerTimerState.put(
+ processingTimeTimerState.put(
timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
break;
default:
throw new UnsupportedOperationException(
- String.format("%s currently only supports event time", SamzaRunner.class));
+ String.format(
+ "%s currently only supports event time or processing time", SamzaRunner.class));
}
}
@@ -380,38 +472,52 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
switch (keyedTimerData.getTimerData().getDomain()) {
case EVENT_TIME:
- eventTimerTimerState.remove(timerKey);
+ eventTimeTimerState.remove(timerKey);
+ timestampSortedEventTimeTimerState.remove(keyedTimerData);
break;
case PROCESSING_TIME:
- processingTimerTimerState.remove(timerKey);
+ processingTimeTimerState.remove(timerKey);
break;
default:
throw new UnsupportedOperationException(
- String.format("%s currently only supports event time", SamzaRunner.class));
+ String.format(
+ "%s currently only supports event time or processing time", SamzaRunner.class));
}
}
- private void loadEventTimeTimers() {
- final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
- eventTimerTimerState.readIterator().read();
- // since the iterator will reach to the end, it will be closed automatically
- while (iter.hasNext()) {
- final Map.Entry<TimerKey<K>, Long> entry = iter.next();
- final KeyedTimerData keyedTimerData =
- TimerKey.toKeyedTimerData(
- entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
+ /**
+ * Reload event time timers from state to memory buffer. Buffer size is bound by
+ * maxEventTimerBufferSize
+ */
+ private void reloadEventTimeTimers() {
+ final Iterator<KeyedTimerData<K>> iter =
+ timestampSortedEventTimeTimerState.readIterator().read();
- eventTimeTimers.add(keyedTimerData);
+ while (iter.hasNext() && eventTimeBuffer.size() < maxEventTimerBufferSize) {
+ final KeyedTimerData<K> keyedTimerData = iter.next();
+ eventTimeBuffer.add(keyedTimerData);
+ maxEventTimeInBuffer = keyedTimerData.getTimerData().getTimestamp().getMillis();
}
- LOG.info("Loaded {} event time timers in memory", eventTimeTimers.size());
+ ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
+ .closeIterators();
+ LOG.info("Loaded {} event time timers in memory", eventTimeBuffer.size());
+
+ if (eventTimeBuffer.size() < maxEventTimerBufferSize) {
+ LOG.debug(
+ "Event time timers in State is empty, filled {} timers out of {} buffer capacity",
+ eventTimeBuffer.size(),
+ maxEventTimeInBuffer);
+ // Reset the flag variable to indicate there are no more KeyedTimerData in State
+ maxEventTimeInBuffer = Long.MAX_VALUE;
+ }
}
private void loadProcessingTimeTimers() {
final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
- processingTimerTimerState.readIterator().read();
+ processingTimeTimerState.readIterator().read();
// since the iterator will reach to the end, it will be closed automatically
int count = 0;
while (iter.hasNext()) {
@@ -424,12 +530,41 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
++count;
}
+ ((SamzaStoreStateInternals.KeyValueIteratorState) processingTimeTimerState).closeIterators();
LOG.info("Loaded {} processing time timers in memory", count);
}
- private void restore() {
- loadEventTimeTimers();
+ /**
+ * Restore timer state from RocksDB. This is needed for migration of existing jobs. Give events
+ * in eventTimeTimerState, construct timestampSortedEventTimeTimerState preparing for memory
+ * reloading. TO-DO: processing time timers are still loaded into memory in one shot; will apply
+ * the same optimization mechanism as event time timer
+ */
+ private void init() {
+ final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
+ eventTimeTimerState.readIterator().read();
+ // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
+ if (eventTimersIter.hasNext()) {
+ final Iterator sortedEventTimerIter =
+ timestampSortedEventTimeTimerState.readIterator().read();
+
+ if (!sortedEventTimerIter.hasNext()) {
+ // inline the migration code
+ while (eventTimersIter.hasNext()) {
+ final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
+ final KeyedTimerData keyedTimerData =
+ TimerKey.toKeyedTimerData(
+ entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
+ timestampSortedEventTimeTimerState.add(keyedTimerData);
+ }
+ }
+ ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
+ .closeIterators();
+ }
+ ((SamzaStoreStateInternals.KeyValueIteratorState) eventTimeTimerState).closeIterators();
+
+ reloadEventTimeTimers();
loadProcessingTimeTimers();
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index b8cfa27..08bef16 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -22,9 +22,10 @@ import static org.apache.samza.config.JobConfig.JOB_ID;
import static org.apache.samza.config.JobConfig.JOB_NAME;
import static org.apache.samza.config.TaskConfig.COMMIT_MS;
import static org.apache.samza.config.TaskConfig.GROUPER_FACTORY;
+import static org.apache.samza.config.TaskConfig.MAX_CONCURRENCY;
import java.io.File;
-import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -34,21 +35,21 @@ import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.container.BeamContainerRunner;
+import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.ConfigLoaderFactory;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ZkConfig;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.yarn.YarnJobFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.serializers.ByteSerdeFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.slf4j.Logger;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
public class ConfigBuilder {
private static final Logger LOG = LoggerFactory.getLogger(ConfigBuilder.class);
+ private static final String BEAM_STORE_FACTORY = "stores.beamStore.factory";
private static final String APP_RUNNER_CLASS = "app.runner.class";
private static final String YARN_PACKAGE_PATH = "yarn.package.path";
private static final String JOB_FACTORY_CLASS = "job.factory.class";
@@ -80,10 +82,11 @@ public class ConfigBuilder {
config.putAll(properties);
}
+ /** @return built configuration */
public Config build() {
try {
// apply framework configs
- config.putAll(createSystemConfig(options));
+ config.putAll(createSystemConfig(options, config));
// apply user configs
config.putAll(createUserConfig(options));
@@ -92,7 +95,10 @@ public class ConfigBuilder {
config.put(ApplicationConfig.APP_ID, options.getJobInstance());
config.put(JOB_NAME, options.getJobName());
config.put(JOB_ID, options.getJobInstance());
+ config.put(MAX_CONCURRENCY, String.valueOf(options.getMaxBundleSize()));
+ // remove config overrides before serialization (LISAMZA-15259)
+ options.setConfigOverride(new HashMap<>());
config.put(
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
@@ -116,21 +122,21 @@ public class ConfigBuilder {
if (StringUtils.isNoneEmpty(configFilePath)) {
LOG.info("configFilePath: " + configFilePath);
- final File configFile = new File(configFilePath);
- final URI configUri = configFile.toURI();
- final ConfigFactory configFactory =
- options.getConfigFactory().getDeclaredConstructor().newInstance();
+ final Config properties = new MapConfig(Collections.singletonMap("path", configFilePath));
+ final ConfigLoaderFactory configLoaderFactory =
+ options.getConfigLoaderFactory().getDeclaredConstructor().newInstance();
- LOG.info("configFactory: " + configFactory.getClass().getName());
+ LOG.info("configLoaderFactory: " + configLoaderFactory.getClass().getName());
// Config file must exist for default properties config
// TODO: add check to all non-empty files once we don't need to
// pass the command-line args through the containers
- if (configFactory instanceof PropertiesConfigFactory) {
- checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath);
+ if (configLoaderFactory instanceof PropertiesConfigLoaderFactory) {
+ checkArgument(
+ new File(configFilePath).exists(), "Config file %s does not exist", configFilePath);
}
- config.putAll(configFactory.getConfig(configUri));
+ config.putAll(configLoaderFactory.getLoader(properties).getConfig());
}
// Apply override on top
if (options.getConfigOverride() != null) {
@@ -181,6 +187,7 @@ public class ConfigBuilder {
final String appRunner = config.get(APP_RUNNER_CLASS);
checkArgument(
appRunner == null
+ || BeamJobCoordinatorRunner.class.getName().equals(appRunner)
|| RemoteApplicationRunner.class.getName().equals(appRunner)
|| BeamContainerRunner.class.getName().equals(appRunner),
"Config %s must be set to %s for %s Deployment",
@@ -208,7 +215,7 @@ public class ConfigBuilder {
.put(
// TODO: remove after SAMZA-1531 is resolved
ApplicationConfig.APP_RUN_ID,
- String.valueOf(System.currentTimeMillis())
+ System.currentTimeMillis()
+ "-"
// use the most significant bits in UUID (8 digits) to avoid collision
+ UUID.randomUUID().toString().substring(0, 8))
@@ -231,23 +238,26 @@ public class ConfigBuilder {
.build();
}
- private static Map<String, String> createSystemConfig(SamzaPipelineOptions options) {
- ImmutableMap.Builder<String, String> configBuilder =
+ private static Map<String, String> createSystemConfig(
+ SamzaPipelineOptions options, Map<String, String> config) {
+ final ImmutableMap.Builder<String, String> configBuilder =
ImmutableMap.<String, String>builder()
- .put(
- "stores.beamStore.factory",
- "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
.put("stores.beamStore.key.serde", "byteArraySerde")
- .put("stores.beamStore.msg.serde", "byteSerde")
- .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName())
+ .put("stores.beamStore.msg.serde", "stateValueSerde")
+ .put(
+ "serializers.registry.stateValueSerde.class",
+ SamzaStoreStateInternals.StateValueSerdeFactory.class.getName())
.put(
"serializers.registry.byteArraySerde.class",
SamzaStoreStateInternals.ByteArraySerdeFactory.class.getName());
- if (options.getStateDurable()) {
- LOG.info("stateDurable is enabled");
- configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
- configBuilder.put("job.host-affinity.enabled", "true");
+ // if config does not contain "stores.beamStore.factory" at this moment,
+ // then it is a stateless job.
+ if (!config.containsKey(BEAM_STORE_FACTORY)) {
+ options.setStateDurable(false);
+ configBuilder.put(
+ BEAM_STORE_FACTORY,
+ "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
}
LOG.info("Execution environment is " + options.getSamzaExecutionEnvironment());
@@ -269,6 +279,23 @@ public class ConfigBuilder {
return configBuilder.build();
}
+ static Map<String, String> createRocksDBStoreConfig(SamzaPipelineOptions options) {
+ final ImmutableMap.Builder<String, String> configBuilder =
+ ImmutableMap.<String, String>builder()
+ .put(
+ BEAM_STORE_FACTORY,
+ "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
+ .put("stores.beamStore.rocksdb.compression", "lz4");
+
+ if (options.getStateDurable()) {
+ LOG.info("stateDurable is enabled");
+ configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
+ configBuilder.put("job.host-affinity.enabled", "true");
+ }
+
+ return configBuilder.build();
+ }
+
private static void validateConfigs(SamzaPipelineOptions options, Map<String, String> config) {
// validate execution environment
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
index d77bc1e..ea5f6d9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.runners.samza.translation;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -33,10 +35,12 @@ public class ConfigContext {
private final Map<PValue, String> idMap;
private AppliedPTransform<?, ?, ?> currentTransform;
private final SamzaPipelineOptions options;
+ private final Set<String> stateIds;
public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions options) {
this.idMap = idMap;
this.options = options;
+ this.stateIds = new HashSet<>();
}
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
@@ -60,6 +64,10 @@ public class ConfigContext {
return this.options;
}
+ public boolean addStateId(String stateId) {
+ return stateIds.add(stateId);
+ }
+
private String getIdForPValue(PValue pvalue) {
final String id = idMap.get(pvalue);
if (id == null) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index 50e62a2..ec7af07 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -61,7 +61,7 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
// for some of the validateRunner tests only
final MessageStream<OpMessage<T>> noOpStream =
ctx.getDummyStream()
- .flatMap(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
+ .flatMapAsync(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
ctx.registerMessageStream(output, noOpStream);
return;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 6d2b2b6..4f3c409 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -19,12 +19,14 @@ package org.apache.beam.runners.samza.translation;
import static org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils.escape;
+import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
@@ -56,7 +58,9 @@ import org.apache.samza.serializers.KVSerde;
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
class GroupByKeyTranslator<K, InputT, OutputT>
implements TransformTranslator<
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>,
+ TransformConfigGenerator<
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
@Override
public void translate(
@@ -111,6 +115,20 @@ class GroupByKeyTranslator<K, InputT, OutputT>
doTranslatePortable(transform, pipeline, ctx);
}
+ @Override
+ public Map<String, String> createConfig(
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+ TransformHierarchy.Node node,
+ ConfigContext ctx) {
+ return ConfigBuilder.createRocksDBStoreConfig(ctx.getPipelineOptions());
+ }
+
+ @Override
+ public Map<String, String> createPortableConfig(
+ PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
+ return ConfigBuilder.createRocksDBStoreConfig(options);
+ }
+
private static <K, InputT, OutputT> void doTranslatePortable(
PipelineNode.PTransformNode transform,
QueryablePipeline pipeline,
@@ -193,8 +211,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
partitionedInputStream
- .flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
- .flatMap(
+ .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+ .flatMapAsync(
OpAdapter.adapt(
new GroupByKeyOp<>(
outputTag,
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 2369f10..15f6f2f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -176,7 +176,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
}
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
- mergedStreams.flatMap(OpAdapter.adapt(op));
+ mergedStreams.flatMapAsync(OpAdapter.adapt(op));
for (int outputIndex : tagToIndexMap.values()) {
@SuppressWarnings("unchecked")
@@ -186,7 +186,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
}
@@ -218,12 +218,21 @@ class ParDoBoundMultiTranslator<InT, OutT>
} catch (IOException e) {
throw new RuntimeException(e);
}
+
String inputId = stagePayload.getInput();
final MessageStream<OpMessage<InT>> inputStream = ctx.getMessageStreamById(inputId);
+
// TODO: support side input
+ if (!stagePayload.getSideInputsList().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Side inputs in portable pipelines are not supported in samza");
+ }
+
+ // set side inputs to empty until it's supported
final List<MessageStream<OpMessage<InT>>> sideInputStreams = Collections.emptyList();
final Map<TupleTag<?>, Integer> tagToIndexMap = new HashMap<>();
+ final Map<Integer, String> indexToIdMap = new HashMap<>();
final Map<String, TupleTag<?>> idToTupleTagMap = new HashMap<>();
// first output as the main output
@@ -238,19 +247,20 @@ class ParDoBoundMultiTranslator<InT, OutT>
outputName -> {
TupleTag<?> tupleTag = new TupleTag<>(outputName);
tagToIndexMap.put(tupleTag, index.get());
- index.incrementAndGet();
String collectionId = outputs.get(outputName);
+ indexToIdMap.put(index.get(), collectionId);
idToTupleTagMap.put(collectionId, tupleTag);
+ index.incrementAndGet();
});
WindowedValue.WindowedValueCoder<InT> windowedInputCoder =
ctx.instantiateCoder(inputId, pipeline.getComponents());
- final DoFnSchemaInformation doFnSchemaInformation;
- doFnSchemaInformation = ParDoTranslation.getSchemaInformation(transform.getTransform());
-
- Map<String, PCollectionView<?>> sideInputMapping =
- ParDoTranslation.getSideInputMapping(transform.getTransform());
+ // TODO: support schema and side inputs for portable runner
+ // Note: transform.getTransform() is an ExecutableStage, not ParDo, so we need to extract
+ // these info from its components.
+ final DoFnSchemaInformation doFnSchemaInformation = null;
+ final Map<String, PCollectionView<?>> sideInputMapping = Collections.emptyMap();
final RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
final PCollection.IsBounded isBounded = SamzaPipelineTranslatorUtils.isBounded(input);
@@ -287,18 +297,19 @@ class ParDoBoundMultiTranslator<InT, OutT>
}
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
- mergedStreams.flatMap(OpAdapter.adapt(op));
+ mergedStreams.flatMapAsync(OpAdapter.adapt(op));
for (int outputIndex : tagToIndexMap.values()) {
+ @SuppressWarnings("unchecked")
final MessageStream<OpMessage<OutT>> outputStream =
taggedOutputStream
.filter(
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
- ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
+ ctx.registerMessageStream(indexToIdMap.get(outputIndex), outputStream);
}
}
@@ -309,15 +320,29 @@ class ParDoBoundMultiTranslator<InT, OutT>
final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
final SamzaPipelineOptions options = ctx.getPipelineOptions();
+ // If a ParDo observes directly or indirectly with window, then this is a stateful ParDo
+ // in this case, we will use RocksDB as system store.
+ if (signature.processElement().observesWindow()) {
+ config.putAll(ConfigBuilder.createRocksDBStoreConfig(options));
+ }
+
if (signature.usesState()) {
// set up user state configs
for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
final String storeId = state.id();
+
+ // TODO: remove validation after we support same state id in different ParDo.
+ if (!ctx.addStateId(storeId)) {
+ throw new IllegalStateException(
+ "Duplicate StateId " + storeId + " found in multiple ParDo.");
+ }
+
config.put(
"stores." + storeId + ".factory",
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
config.put("stores." + storeId + ".key.serde", "byteArraySerde");
- config.put("stores." + storeId + ".msg.serde", "byteSerde");
+ config.put("stores." + storeId + ".msg.serde", "stateValueSerde");
+ config.put("stores." + storeId + ".rocksdb.compression", "lz4");
if (options.getStateDurable()) {
config.put(
@@ -334,6 +359,13 @@ class ParDoBoundMultiTranslator<InT, OutT>
return config;
}
+ @Override
+ public Map<String, String> createPortableConfig(
+ PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
+ // TODO: Add beamStore configs when portable use case supports stateful ParDo.
+ return Collections.emptyMap();
+ }
+
static class SideInputWatermarkFn<InT>
implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
WatermarkFunction<OpMessage<InT>> {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 8eb8746..3514b4f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.samza.translation;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.service.AutoService;
import java.util.HashMap;
@@ -58,9 +57,6 @@ public class SamzaPipelineTranslator {
private SamzaPipelineTranslator() {}
public static void translate(Pipeline pipeline, TranslationContext ctx) {
- checkState(
- ctx.getPipelineOptions().getMaxBundleSize() <= 1,
- "bundling is not supported for non portable mode. Please disable bundling (by setting max bundle size to 1).");
final TransformVisitorFn translateFn =
new TransformVisitorFn() {
@@ -180,18 +176,19 @@ public class SamzaPipelineTranslator {
@Override
public Map<String, TransformTranslator<?>> getTransformTranslators() {
return ImmutableMap.<String, TransformTranslator<?>>builder()
- .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator())
- .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator())
- .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
- .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
- .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslator())
- .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator())
- .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new SamzaPublishViewTranslator())
+ .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator<>())
+ .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator<>())
+ .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+ .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+ .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslator<>())
+ .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator<>())
+ .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new SamzaPublishViewTranslator<>())
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
+ .put(ExecutableStage.URN, new ParDoBoundMultiTranslator<>())
+ .put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, new SamzaTestStreamTranslator())
.put(
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN,
new SplittableParDoTranslators.ProcessKeyedElements<>())
- .put(ExecutableStage.URN, new ParDoBoundMultiTranslator())
.build();
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java
new file mode 100644
index 0000000..96dc577
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A Samza system factory that supports consuming from {@link TestStream} and translating events
+ * into messages according to the {@link org.apache.beam.sdk.testing.TestStream.EventType} of the
+ * events.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class SamzaTestStreamSystemFactory implements SystemFactory {
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ final String streamPrefix = "systems." + systemName;
+ final Config scopedConfig = config.subset(streamPrefix + ".", true);
+ return new SmazaTestStreamSystemConsumer<>(getTestStream(scopedConfig));
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ throw new UnsupportedOperationException("SamzaTestStreamSystem doesn't support producing");
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new SamzaTestStreamSystemAdmin();
+ }
+
+ /** A helper function to decode testStream from the config. */
+ private static <T> TestStream<T> getTestStream(Config config) {
+ @SuppressWarnings("unchecked")
+ final SerializableFunction<String, TestStream<T>> testStreamDecoder =
+ Base64Serializer.deserializeUnchecked(
+ config.get("testStreamDecoder"), SerializableFunction.class);
+ final TestStream<T> testStream = testStreamDecoder.apply(config.get("encodedTestStream"));
+ return testStream;
+ }
+
+ private static final String DUMMY_OFFSET = "0";
+
+ /** System admin for SmazaTestStreamSystem. */
+ public static class SamzaTestStreamSystemAdmin implements SystemAdmin {
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(
+ Map<SystemStreamPartition, String> offsets) {
+ return offsets.keySet().stream()
+ .collect(Collectors.toMap(Function.identity(), k -> DUMMY_OFFSET));
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ return streamNames.stream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ stream -> {
+ // TestStream will always be single partition
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
+ partitionMetadata =
+ Collections.singletonMap(
+ new Partition(0),
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(
+ DUMMY_OFFSET, DUMMY_OFFSET, DUMMY_OFFSET));
+ return new SystemStreamMetadata(stream, partitionMetadata);
+ }));
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return 0;
+ }
+ }
+
+ /** System consumer for SmazaTestStreamSystem. */
+ public static class SmazaTestStreamSystemConsumer<T> implements SystemConsumer {
+ TestStream<T> testStream;
+
+ public SmazaTestStreamSystemConsumer(TestStream<T> testStream) {
+ this.testStream = testStream;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {}
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ throws InterruptedException {
+ SystemStreamPartition ssp = systemStreamPartitions.iterator().next();
+ ArrayList<IncomingMessageEnvelope> messages = new ArrayList<>();
+
+ for (TestStream.Event<T> event : testStream.getEvents()) {
+ if (event.getType().equals(TestStream.EventType.ELEMENT)) {
+ // If event type is element, for each element, create a message with the element and
+ // timestamp.
+ for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) event).getElements()) {
+ WindowedValue<T> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow(
+ element.getValue(), element.getTimestamp());
+ final OpMessage<T> opMessage = OpMessage.ofElement(windowedValue);
+ final IncomingMessageEnvelope envelope =
+ new IncomingMessageEnvelope(ssp, DUMMY_OFFSET, null, opMessage);
+ messages.add(envelope);
+ }
+ } else if (event.getType().equals(TestStream.EventType.WATERMARK)) {
+ // If event type is watermark, create a watermark message.
+ long watermarkMillis = ((TestStream.WatermarkEvent<T>) event).getWatermark().getMillis();
+ final IncomingMessageEnvelope envelope =
+ IncomingMessageEnvelope.buildWatermarkEnvelope(ssp, watermarkMillis);
+ messages.add(envelope);
+ if (watermarkMillis == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // If watermark reached max watermark, also create a end-of-stream message
+ final IncomingMessageEnvelope endOfStreamMessage =
+ IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp);
+ messages.add(endOfStreamMessage);
+ break;
+ }
+ } else if (event.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
+ throw new UnsupportedOperationException(
+ "Advancing Processing time is not supported by the Samza Runner.");
+ } else {
+ throw new SamzaException("Unknown event type " + event.getType());
+ }
+ }
+
+ return ImmutableMap.of(ssp, messages);
+ }
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
new file mode 100644
index 0000000..ef38a79
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+
+/**
+ * Translate {@link org.apache.beam.sdk.testing.TestStream} to a samza message stream produced by
+ * {@link
+ * org.apache.beam.runners.samza.translation.SamzaTestStreamSystemFactory.SmazaTestStreamSystemConsumer}.
+ */
+@SuppressWarnings({"rawtypes"})
+public class SamzaTestStreamTranslator<T> implements TransformTranslator<TestStream<T>> {
+
+ @Override
+ public void translate(
+ TestStream<T> testStream, TransformHierarchy.Node node, TranslationContext ctx) {
+ final PCollection<T> output = ctx.getOutput(testStream);
+ final String outputId = ctx.getIdForPValue(output);
+ final Coder<T> valueCoder = testStream.getValueCoder();
+ final TestStream.TestStreamCoder<T> testStreamCoder = TestStream.TestStreamCoder.of(valueCoder);
+ final GenericSystemDescriptor systemDescriptor =
+ new GenericSystemDescriptor(outputId, SamzaTestStreamSystemFactory.class.getName());
+
+ // encode testStream as a string
+ final String encodedTestStream;
+ try {
+ encodedTestStream = CoderUtils.encodeToBase64(testStreamCoder, testStream);
+ } catch (CoderException e) {
+ throw new SamzaException("Could not encode TestStream.", e);
+ }
+
+ // the decoder for encodedTestStream
+ SerializableFunction<String, TestStream<T>> testStreamDecoder =
+ string -> {
+ try {
+ return CoderUtils.decodeFromBase64(TestStream.TestStreamCoder.of(valueCoder), string);
+ } catch (CoderException e) {
+ throw new SamzaException("Could not decode TestStream.", e);
+ }
+ };
+
+ final Map<String, String> systemConfig =
+ ImmutableMap.of(
+ "encodedTestStream",
+ encodedTestStream,
+ "testStreamDecoder",
+ Base64Serializer.serializeUnchecked(testStreamDecoder));
+ systemDescriptor.withSystemConfigs(systemConfig);
+
+ // The KvCoder is needed here for Samza not to crop the key.
+ final Serde<KV<?, OpMessage<byte[]>>> kvSerde = KVSerde.of(new NoOpSerde(), new NoOpSerde<>());
+ final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
+ systemDescriptor.getInputDescriptor(outputId, kvSerde);
+
+ ctx.registerInputMessageStream(output, inputDescriptor);
+ }
+
+ @Override
+ public void translatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
+ throw new SamzaException("TestStream is not supported in portable by Samza runner");
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
index e2a37c2..91dc2a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
@@ -119,8 +119,8 @@ public class SplittableParDoTranslators {
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
partitionedInputStream
- .flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
- .flatMap(
+ .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+ .flatMapAsync(
OpAdapter.adapt(
new SplittableParDoProcessKeyedElementsOp<>(
transform.getMainOutputTag(),
@@ -139,7 +139,7 @@ public class SplittableParDoTranslators {
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index a298f38..5a69628 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -17,8 +17,12 @@
*/
package org.apache.beam.runners.samza.translation;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.beam.runners.core.construction.TransformInputs;
@@ -93,26 +97,38 @@ public class TranslationContext {
}
public <OutT> void registerInputMessageStream(
- PValue pvalue,
- InputDescriptor<org.apache.samza.operators.KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
- // we want to register it with the Samza graph only once per i/o stream
- final String streamId = inputDescriptor.getStreamId();
- if (registeredInputStreams.containsKey(streamId)) {
- MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
- LOG.info(
- String.format(
- "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
- streamId, messageStream, pvalue));
- registerMessageStream(pvalue, messageStream);
-
- return;
- }
- @SuppressWarnings("unchecked")
- final MessageStream<OpMessage<OutT>> typedStream =
- getValueStream(appDescriptor.getInputStream(inputDescriptor));
+ PValue pvalue, InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
+ registerInputMessageStreams(pvalue, Collections.singletonList(inputDescriptor));
+ }
- registerMessageStream(pvalue, typedStream);
- registeredInputStreams.put(streamId, typedStream);
+ /**
+ * Function to register a merged messageStream of all input messageStreams to a PCollection.
+ *
+ * @param pvalue output of a transform
+ * @param inputDescriptors a list of Samza InputDescriptors
+ */
+ public <OutT> void registerInputMessageStreams(
+ PValue pvalue, List<? extends InputDescriptor<KV<?, OpMessage<OutT>>, ?>> inputDescriptors) {
+ final Set<MessageStream<OpMessage<OutT>>> streamsToMerge = new HashSet<>();
+ for (InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor : inputDescriptors) {
+ final String streamId = inputDescriptor.getStreamId();
+ // each streamId registered in map should already be add in messageStreamMap
+ if (registeredInputStreams.containsKey(streamId)) {
+ @SuppressWarnings("unchecked")
+ MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
+ LOG.info(
+ String.format(
+ "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
+ streamId, messageStream, pvalue));
+ streamsToMerge.add(messageStream);
+ } else {
+ final MessageStream<OpMessage<OutT>> typedStream =
+ getValueStream(appDescriptor.getInputStream(inputDescriptor));
+ registeredInputStreams.put(streamId, typedStream);
+ streamsToMerge.add(typedStream);
+ }
+ }
+ registerMessageStream(pvalue, MessageStream.mergeAll(streamsToMerge));
}
public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
@@ -204,9 +220,8 @@ public class TranslationContext {
tableDesc.getTableId(), id -> appDescriptor.getTable(tableDesc));
}
- private static <T> MessageStream<T> getValueStream(
- MessageStream<org.apache.samza.operators.KV<?, T>> input) {
- return input.map(org.apache.samza.operators.KV::getValue);
+ private static <T> MessageStream<T> getValueStream(MessageStream<KV<?, T>> input) {
+ return input.map(KV::getValue);
}
public String getIdForPValue(PValue pvalue) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
index 114a256..a8790c5 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
@@ -47,7 +47,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
final MessageStream<OpMessage<T>> inputStream = ctx.getMessageStream(ctx.getInput(transform));
final MessageStream<OpMessage<T>> outputStream =
- inputStream.flatMap(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+ inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
ctx.registerMessageStream(output, outputStream);
}
@@ -73,7 +73,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
final MessageStream<OpMessage<T>> inputStream = ctx.getOneInputMessageStream(transform);
final MessageStream<OpMessage<T>> outputStream =
- inputStream.flatMap(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+ inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java
new file mode 100644
index 0000000..09ad77b
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A util class to handle java 8 {@link CompletableFuture} and {@link CompletionStage}. */
+@SuppressWarnings({"rawtypes"})
+public final class FutureUtils {
+ /**
+ * Flattens the input future collection and returns a single future comprising the results of all
+ * the futures.
+ *
+ * @param inputFutures input future collection
+ * @param <T> result type of the input future
+ * @return a single {@link CompletionStage} that contains the results of all the input futures.
+ */
+ public static <T> CompletionStage<Collection<T>> flattenFutures(
+ Collection<CompletionStage<T>> inputFutures) {
+ CompletableFuture<T>[] futures = inputFutures.toArray(new CompletableFuture[0]);
+
+ return CompletableFuture.allOf(futures)
+ .thenApply(
+ ignored -> {
+ final List<T> result =
+ Stream.of(futures).map(CompletableFuture::join).collect(Collectors.toList());
+ return result;
+ });
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java
new file mode 100644
index 0000000..a5b03a2
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static org.apache.beam.runners.samza.SamzaPipelineOptionsValidator.validateBundlingRelatedOptions;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+/** Test for {@link SamzaPipelineOptionsValidator}. */
+public class SamzaPipelineOptionsValidatorTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBundleEnabledInMultiThreadedModeThrowsException() {
+ SamzaPipelineOptions mockOptions = mock(SamzaPipelineOptions.class);
+ Map<String, String> config = ImmutableMap.of(JOB_CONTAINER_THREAD_POOL_SIZE, "10");
+
+ when(mockOptions.getMaxBundleSize()).thenReturn(2L);
+ when(mockOptions.getConfigOverride()).thenReturn(config);
+ validateBundlingRelatedOptions(mockOptions);
+ }
+
+ @Test
+ public void testBundleEnabledInSingleThreadedMode() {
+ SamzaPipelineOptions mockOptions = mock(SamzaPipelineOptions.class);
+ when(mockOptions.getMaxBundleSize()).thenReturn(2L);
+
+ try {
+ Map<String, String> config = ImmutableMap.of(JOB_CONTAINER_THREAD_POOL_SIZE, "1");
+ when(mockOptions.getConfigOverride()).thenReturn(config);
+ validateBundlingRelatedOptions(mockOptions);
+
+ // In the absence of configuration make sure it is treated as single threaded mode.
+ when(mockOptions.getConfigOverride()).thenReturn(Collections.emptyMap());
+ validateBundlingRelatedOptions(mockOptions);
+ } catch (Exception e) {
+ throw new AssertionError("Bundle size > 1 should be supported in single threaded mode");
+ }
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
index fd61bc0..cd92077 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.samza.adapter;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createElementMessage;
+import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createEndOfStreamMessage;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createWatermarkMessage;
import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.expectWrappedException;
import static org.junit.Assert.assertEquals;
@@ -102,6 +103,33 @@ public class UnboundedSourceSystemTest {
}
@Test
+ public void testMaxWatermarkTriggersEndOfStreamMessage()
+ throws IOException, InterruptedException {
+ final TestUnboundedSource<String> source =
+ TestUnboundedSource.<String>createBuilder()
+ .addElements("test")
+ .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE)
+ .build();
+
+ final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
+ createConsumer(source);
+
+ consumer.register(DEFAULT_SSP, NULL_STRING);
+ consumer.start();
+ List<IncomingMessageEnvelope> actualList =
+ consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS);
+ actualList.addAll(
+ consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
+ assertEquals(
+ Arrays.asList(
+ createElementMessage(DEFAULT_SSP, offset(0), "test", BoundedWindow.TIMESTAMP_MIN_VALUE),
+ createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
+ createEndOfStreamMessage(DEFAULT_SSP)),
+ actualList);
+ consumer.stop();
+ }
+
+ @Test
public void testAdvanceTimestamp() throws IOException, InterruptedException {
final Instant timestamp = Instant.now();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
new file mode 100644
index 0000000..4baf7be
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@linkplain BundleManager}. */
+@SuppressWarnings({"nullness"})
+// TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+public final class BundleManagerTest {
+ private static final long MAX_BUNDLE_SIZE = 3;
+ private static final long MAX_BUNDLE_TIME_MS = 2000;
+ private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer";
+
+ private FutureCollector<String> mockFutureCollector;
+ private BundleManager<String> bundleManager;
+ private BundleManager.BundleProgressListener<String> bundleProgressListener;
+ private Scheduler<KeyedTimerData<Void>> mockScheduler;
+
+ @Before
+ public void setUp() {
+ mockFutureCollector = mock(FutureCollector.class);
+ bundleProgressListener = mock(BundleManager.BundleProgressListener.class);
+ mockScheduler = mock(Scheduler.class);
+ bundleManager =
+ new BundleManager<>(
+ bundleProgressListener,
+ mockFutureCollector,
+ MAX_BUNDLE_SIZE,
+ MAX_BUNDLE_TIME_MS,
+ mockScheduler,
+ BUNDLE_CHECK_TIMER_ID);
+ }
+
+ @Test
+ public void testTryStartBundleStartsBundle() {
+ bundleManager.tryStartBundle();
+
+ verify(bundleProgressListener, times(1)).onBundleStarted();
+ assertEquals(
+ "Expected the number of element in the current bundle to be 1",
+ 1L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+ assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testTryStartBundleThrowsExceptionAndSignalError() {
+ bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
+ try {
+ bundleManager.tryStartBundle();
+ } catch (IllegalArgumentException e) {
+ bundleManager.signalFailure(e);
+ }
+
+ // verify if the signal failure only resets appropriate attributes of bundle
+ verify(mockFutureCollector, times(1)).prepare();
+ verify(mockFutureCollector, times(1)).discard();
+ assertEquals(
+ "Expected the number of element in the current bundle to 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testTryStartBundleThrowsExceptionFromTheListener() {
+ doThrow(new RuntimeException("User start bundle threw an exception"))
+ .when(bundleProgressListener)
+ .onBundleStarted();
+
+ try {
+ bundleManager.tryStartBundle();
+ } catch (RuntimeException e) {
+ bundleManager.signalFailure(e);
+ }
+
+ // verify if the signal failure only resets appropriate attributes of bundle
+ verify(mockFutureCollector, times(1)).prepare();
+ verify(mockFutureCollector, times(1)).discard();
+ assertEquals(
+ "Expected the number of element in the current bundle to 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testMultipleStartBundle() {
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+
+ // second invocation should not start the bundle
+ verify(bundleProgressListener, times(1)).onBundleStarted();
+ assertEquals(
+ "Expected the number of element in the current bundle to be 2",
+ 2L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+ assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
+ }
+
+ /*
+ * Setup the bundle manager with default max bundle size as 3 and max bundle close timeout to 2 seconds.
+ * The test verifies the following
+ * 1. Bundle gets closed on tryFinishBundle()
+ * a. pending bundle count == 0
+ * b. element in current bundle == 0
+ * c. isBundleStarted == false
+ * 2. onBundleFinished callback is invoked on the progress listener
+ */
+ @Test
+ public void testTryFinishBundleClosesBundle() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+ bundleManager.tryFinishBundle(mockEmitter);
+
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testTryFinishBundleClosesBundleOnMaxWatermark() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+ bundleManager.setBundleWatermarkHold(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+ bundleManager.tryFinishBundle(mockEmitter);
+
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+ }
+
+ /*
+ * Set up the bundle manager with defaults and ensure the bundle manager doesn't close the current active bundle.
+ */
+ @Test
+ public void testTryFinishBundleShouldNotCloseBundle() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+ bundleManager.tryStartBundle();
+ bundleManager.tryFinishBundle(mockEmitter);
+
+ verify(mockFutureCollector, times(1)).finish();
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 1",
+ 1L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+ assertTrue("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testTryFinishBundleWhenNoBundleInProgress() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ when(mockFutureCollector.finish())
+ .thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+ bundleManager.tryFinishBundle(mockEmitter);
+
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ assertNull(
+ "tryFinishBundle() should not set the future when no bundle in progress",
+ bundleManager.getCurrentBundleDoneFuture());
+ }
+
+ @Test
+ public void testProcessWatermarkWhenNoBundleInProgress() {
+ Instant now = Instant.now();
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ bundleManager.processWatermark(now, mockEmitter);
+ verify(bundleProgressListener, times(1)).onWatermark(now, mockEmitter);
+ }
+
+ /*
+ * The test validates processing watermark during an active bundle in progress and also validates
+ * if the watermark hold is propagated down stream after the output futures are resolved.
+ */
+ @Test
+ public void testProcessWatermarkWithPendingBundles() {
+ CountDownLatch latch = new CountDownLatch(1);
+ Instant watermark = Instant.now();
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+
+ // We need to capture the finish bundle future to know if we can check for output watermark
+ // and verify other callbacks get invoked.
+ Class<CompletionStage<Collection<WindowedValue<String>>>> outputFutureClass =
+ (Class<CompletionStage<Collection<WindowedValue<String>>>>) (Class) CompletionStage.class;
+ ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor =
+ ArgumentCaptor.forClass(outputFutureClass);
+
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError("Test interrupted when waiting for latch");
+ }
+
+ return Collections.singleton(mock(WindowedValue.class));
+ }));
+
+ testWatermarkHoldWhenPendingBundleInProgress(mockEmitter, captor, watermark);
+ testWatermarkHoldPropagatesAfterFutureResolution(mockEmitter, captor, latch, watermark);
+ }
+
+ @Test
+ public void testMaxWatermarkPropagationForPendingBundle() {
+ Instant watermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ bundleManager.setPendingBundleCount(1);
+ bundleManager.processWatermark(watermark, mockEmitter);
+ verify(bundleProgressListener, times(1)).onWatermark(watermark, mockEmitter);
+ }
+
+ @Test
+ public void testMaxWatermarkWithBundleInProgress() {
+ Instant watermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+
+ // should force close bundle
+ bundleManager.processWatermark(watermark, mockEmitter);
+ verify(bundleProgressListener, times(1)).onWatermark(watermark, mockEmitter);
+ }
+
+ @Test
+ public void testProcessTimerWithBundleTimeElapsed() {
+ BundleManager<String> bundleManager =
+ new BundleManager<>(
+ bundleProgressListener,
+ mockFutureCollector,
+ MAX_BUNDLE_SIZE,
+ 0,
+ mockScheduler,
+ BUNDLE_CHECK_TIMER_ID);
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+ TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+ when(mockFutureCollector.finish())
+ .thenReturn(
+ CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+ when(mockTimerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
+ when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+ bundleManager.tryStartBundle();
+ bundleManager.processTimer(mockTimer, mockEmitter);
+
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testProcessTimerWithTimeLessThanMaxBundleTime() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+ TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+ when(mockTimerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
+ when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+ when(mockFutureCollector.finish())
+ .thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+ bundleManager.tryStartBundle();
+ bundleManager.processTimer(mockTimer, mockEmitter);
+
+ verify(mockFutureCollector, times(1)).finish();
+ verify(mockEmitter, times(1)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 1",
+ 1L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+ assertTrue("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testProcessTimerIgnoresNonBundleTimers() {
+ OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+ KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+ TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+ when(mockTimerData.getTimerId()).thenReturn("NotBundleTimer");
+ when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+ bundleManager.tryStartBundle();
+ bundleManager.processTimer(mockTimer, mockEmitter);
+
+ verify(mockFutureCollector, times(0)).finish();
+ verify(mockEmitter, times(0)).emitFuture(anyObject());
+ verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+ assertEquals(
+ "Expected the number of element in the current bundle to be 1",
+ 1L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+ assertTrue("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+ }
+
+ @Test
+ public void testSignalFailureResetsTheBundleAndCollector() {
+ bundleManager.tryStartBundle();
+
+ bundleManager.signalFailure(mock(Throwable.class));
+ verify(mockFutureCollector, times(1)).prepare();
+ verify(mockFutureCollector, times(1)).discard();
+ assertEquals(
+ "Expected the number of element in the current bundle to 0",
+ 0L,
+ bundleManager.getCurrentBundleElementCount());
+ assertEquals(
+ "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+ assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+ }
+
+ /*
+ * We validate the following
+ * 1. Process watermark is held since there is a pending bundle.
+ * 2. Watermark propagates down stream once the output future is resolved.
+ * 3. The watermark propagated is the one that was held before closing the bundle
+ * 4. onBundleFinished and onWatermark callbacks are triggered
+ * 5. Pending bundle count is decremented once the future is resolved
+ */
+ private void testWatermarkHoldPropagatesAfterFutureResolution(
+ OpEmitter<String> mockEmitter,
+ ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor,
+ CountDownLatch latch,
+ Instant sealedWatermark) {
+ Instant higherWatermark = Instant.now();
+
+ // Process watermark should result in watermark hold again since pending bundle count > 1
+ bundleManager.processWatermark(higherWatermark, mockEmitter);
+ verify(bundleProgressListener, times(0)).onWatermark(higherWatermark, mockEmitter);
+
+ // Resolving the process output futures should result in watermark propagation
+ latch.countDown();
+ CompletionStage<Void> validationFuture =
+ captor
+ .getValue()
+ .thenAccept(
+ results -> {
+ verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+ verify(bundleProgressListener, times(1))
+ .onWatermark(sealedWatermark, mockEmitter);
+ assertEquals(
+ "Expected the pending bundle count to be 0",
+ 0L,
+ bundleManager.getPendingBundleCount());
+ });
+
+ validationFuture.toCompletableFuture().join();
+ }
+
+ /*
+ * We validate the following
+ * 1. Watermark is held since there is a bundle in progress
+ * 2. Callbacks are not invoked when tryFinishBundle() is invoked since the future is unresolved
+ * 3. Watermark hold is sealed and output future is emitted
+ */
+ private void testWatermarkHoldWhenPendingBundleInProgress(
+ OpEmitter<String> mockEmitter,
+ ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor,
+ Instant watermark) {
+ // Starts the bundle and reach the max bundle size so that tryFinishBundle() seals the current
+ // bundle
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+ bundleManager.tryStartBundle();
+
+ bundleManager.processWatermark(watermark, mockEmitter);
+ verify(bundleProgressListener, times(0)).onWatermark(watermark, mockEmitter);
+
+ // Bundle is still unresolved although sealed since count down the latch is not yet decremented.
+ bundleManager.tryFinishBundle(mockEmitter);
+ verify(mockFutureCollector, times(1)).finish();
+ verify(mockEmitter, times(1)).emitFuture(captor.capture());
+ assertFalse("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
new file mode 100644
index 0000000..f126dd1
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */
+public final class FutureCollectorImplTest {
+ private static final List<String> RESULTS = ImmutableList.of("hello", "world");
+ private FutureCollector<String> futureCollector = new DoFnOp.FutureCollectorImpl<>();
+
+ @Before
+ public void setup() {
+ futureCollector = new DoFnOp.FutureCollectorImpl<>();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAddWithoutPrepareCallThrowsException() {
+ futureCollector.add(mock(CompletionStage.class));
+ }
+
+ @Test
+ public void testFinishWithoutPrepareReturnsEmptyCollection() {
+ CompletionStage<Collection<WindowedValue<String>>> resultFuture = futureCollector.finish();
+ CompletionStage<Void> validationFuture =
+ resultFuture.thenAccept(
+ result -> {
+ Assert.assertTrue("Expected the result to be empty", result.isEmpty());
+ });
+ validationFuture.toCompletableFuture().join();
+ }
+
+ @Test
+ public void testFinishReturnsExpectedResults() {
+ WindowedValue<String> mockWindowedValue = mock(WindowedValue.class);
+
+ when(mockWindowedValue.getValue()).thenReturn("hello").thenReturn("world");
+
+ futureCollector.prepare();
+ futureCollector.add(CompletableFuture.completedFuture(mockWindowedValue));
+ futureCollector.add(CompletableFuture.completedFuture(mockWindowedValue));
+
+ CompletionStage<Collection<WindowedValue<String>>> resultFuture = futureCollector.finish();
+ CompletionStage<Void> validationFuture =
+ resultFuture.thenAccept(
+ results -> {
+ List<String> actualResults =
+ results.stream().map(WindowedValue::getValue).collect(Collectors.toList());
+ Assert.assertEquals(
+ "Expected the result to be {hello, world}", RESULTS, actualResults);
+ });
+ validationFuture.toCompletableFuture().join();
+ }
+
+ @Test
+ public void testMultiplePrepareCallsWithoutFinishThrowsException() {
+ futureCollector.prepare();
+
+ try {
+ futureCollector.prepare();
+ Assert.fail("Second invocation of prepare should throw IllegalStateException");
+ } catch (IllegalStateException ex) {
+ }
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index a18d875..d3da93a 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -36,18 +35,14 @@ public class KeyedTimerDataTest {
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
private static final Instant TIMESTAMP =
new DateTime(2020, 8, 11, 13, 42, 9, DateTimeZone.UTC).toInstant();
- private static final Instant OUTPUT_TIMESTAMP = TIMESTAMP.plus(Duration.standardSeconds(30));
+ // TODO: LISAMZA-19205 Test OUTPUT_TIMESTAMP after outputTimestamp is encoded
+ // private static final Instant OUTPUT_TIMESTAMP = TIMESTAMP.plus(Duration.standardSeconds(30));
@Test
public void testCoder() throws Exception {
final TimerInternals.TimerData td =
TimerInternals.TimerData.of(
- "timer",
- "timerFamily",
- StateNamespaces.global(),
- TIMESTAMP,
- OUTPUT_TIMESTAMP,
- TimeDomain.EVENT_TIME);
+ "timer", StateNamespaces.global(), TIMESTAMP, TIMESTAMP, TimeDomain.EVENT_TIME);
final String key = "timer-key";
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -58,6 +53,7 @@ public class KeyedTimerDataTest {
final KeyedTimerData.KeyedTimerDataCoder<String> ktdCoder =
new KeyedTimerData.KeyedTimerDataCoder<>(STRING_CODER, GlobalWindow.Coder.INSTANCE);
- CoderProperties.coderDecodeEncodeEqual(ktdCoder, ktd);
+ // TODO: LISAMZA-19205: use CoderProperties.coderDecodeEncodeEqual
+ CoderProperties.coderDecodeEncodeEqualInContext(ktdCoder, Coder.Context.OUTER, ktd);
}
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
index 1291e4c..533828a 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.runners.samza.runtime;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,9 +34,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.TestSamzaRunner;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValue;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValueSerdeFactory;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -58,6 +64,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
@@ -206,16 +213,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
/** A test store based on InMemoryKeyValueStore. */
public static class TestStore extends InMemoryKeyValueStore {
static List<TestKeyValueIteraor> iterators = Collections.synchronizedList(new ArrayList<>());
- private final KeyValueStoreMetrics metrics;
public TestStore(KeyValueStoreMetrics metrics) {
super(metrics);
- this.metrics = metrics;
- }
-
- @Override
- public KeyValueStoreMetrics metrics() {
- return metrics;
}
@Override
@@ -285,7 +285,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 42), KV.of("hello", 12)))
.apply(ParDo.of(fn));
- Map<String, String> configs = new HashMap(ConfigBuilder.localRunConfig());
+ SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ options.setRunner(TestSamzaRunner.class);
+ Map<String, String> configs = new HashMap<>(ConfigBuilder.localRunConfig());
configs.put("stores.foo.factory", TestStorageEngine.class.getName());
pipeline.getOptions().as(SamzaPipelineOptions.class).setConfigOverride(configs);
pipeline.run();
@@ -295,4 +297,24 @@ public class SamzaStoreStateInternalsTest implements Serializable {
assertEquals(8, TestStore.iterators.size());
TestStore.iterators.forEach(iter -> assertTrue(iter.closed));
}
+
+ @Test
+ public void testStateValueSerde() throws IOException {
+ StateValueSerdeFactory stateValueSerdeFactory = new StateValueSerdeFactory();
+ Serde<StateValue<Integer>> serde = (Serde) stateValueSerdeFactory.getSerde("Test", null);
+ int value = 123;
+ Coder<Integer> coder = VarIntCoder.of();
+
+ byte[] valueBytes = serde.toBytes(StateValue.of(value, coder));
+ StateValue<Integer> stateValue1 = serde.fromBytes(valueBytes);
+ StateValue<Integer> stateValue2 = StateValue.of(valueBytes);
+ assertEquals(stateValue1.getValue(coder).intValue(), value);
+ assertEquals(stateValue2.getValue(coder).intValue(), value);
+
+ Integer nullValue = null;
+ byte[] nullBytes = serde.toBytes(StateValue.of(nullValue, coder));
+ StateValue<Integer> nullStateValue = serde.fromBytes(nullBytes);
+ assertNull(nullBytes);
+ assertNull(nullStateValue.getValue(coder));
+ }
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 9af37a9..4c750b5 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -38,6 +38,8 @@ import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.ByteArray;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.ByteArraySerdeFactory;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValue;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValueSerdeFactory;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
@@ -48,7 +50,6 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.Scheduler;
-import org.apache.samza.serializers.ByteSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
@@ -74,7 +75,7 @@ import org.rocksdb.WriteOptions;
public class SamzaTimerInternalsFactoryTest {
@Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
- private KeyValueStore<ByteArray, byte[]> createStore() {
+ private KeyValueStore<ByteArray, StateValue<?>> createStore() {
final Options options = new Options();
options.setCreateIfMissing(true);
@@ -92,12 +93,12 @@ public class SamzaTimerInternalsFactoryTest {
return new SerializedKeyValueStore<>(
rocksStore,
new ByteArraySerdeFactory.ByteArraySerde(),
- new ByteSerde(),
+ new StateValueSerdeFactory.StateValueSerde(),
new SerializedKeyValueStoreMetrics("beamStore", new MetricsRegistryMap()));
}
private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
- SamzaPipelineOptions pipelineOptions, KeyValueStore<ByteArray, byte[]> store) {
+ SamzaPipelineOptions pipelineOptions, KeyValueStore<ByteArray, StateValue<?>> store) {
final TaskContext context = mock(TaskContext.class);
when(context.getStore(anyString())).thenReturn((KeyValueStore) store);
final TupleTag<?> mainOutputTag = new TupleTag<>("output");
@@ -110,7 +111,7 @@ public class SamzaTimerInternalsFactoryTest {
Scheduler<KeyedTimerData<String>> timerRegistry,
String timerStateId,
SamzaPipelineOptions pipelineOptions,
- KeyValueStore<ByteArray, byte[]> store) {
+ KeyValueStore<ByteArray, StateValue<?>> store) {
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
createNonKeyedStateInternalsFactory(pipelineOptions, store);
@@ -144,7 +145,7 @@ public class SamzaTimerInternalsFactoryTest {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
- final KeyValueStore<ByteArray, byte[]> store = createStore();
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
createTimerInternalsFactory(null, "timer", pipelineOptions, store);
@@ -178,11 +179,68 @@ public class SamzaTimerInternalsFactoryTest {
}
@Test
+ public void testRestoreEventBufferSize() throws Exception {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+
+ KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final String key = "testKey";
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
+ final TimerInternals.TimerData timer1 =
+ TimerInternals.TimerData.of(
+ "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer1);
+
+ store.close();
+
+ // restore by creating a new instance
+ store = createStore();
+
+ final SamzaTimerInternalsFactory<String> restoredFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+ assertEquals(1, restoredFactory.getEventTimeBuffer().size());
+
+ restoredFactory.setInputWatermark(new Instant(150));
+ Collection<KeyedTimerData<String>> readyTimers = restoredFactory.removeReadyTimers();
+ assertEquals(1, readyTimers.size());
+
+ // Timer 1 should be evicted from buffer
+ assertTrue(restoredFactory.getEventTimeBuffer().isEmpty());
+ final TimerInternals restoredTimerInternals = restoredFactory.timerInternalsForKey(key);
+ final TimerInternals.TimerData timer2 =
+ TimerInternals.TimerData.of(
+ "timer2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME);
+ restoredTimerInternals.setTimer(timer2);
+
+ // Timer 2 should be added to the Event buffer
+ assertEquals(1, restoredFactory.getEventTimeBuffer().size());
+ // Timer 2 should not be ready
+ readyTimers = restoredFactory.removeReadyTimers();
+ assertEquals(0, readyTimers.size());
+
+ restoredFactory.setInputWatermark(new Instant(250));
+
+ // Timer 2 should be ready
+ readyTimers = restoredFactory.removeReadyTimers();
+ assertEquals(1, readyTimers.size());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ StringUtf8Coder.of().encode(key, baos);
+ byte[] keyBytes = baos.toByteArray();
+ assertEquals(readyTimers, Arrays.asList(new KeyedTimerData<>(keyBytes, key, timer2)));
+
+ store.close();
+ }
+
+ @Test
public void testRestore() throws Exception {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
- KeyValueStore<ByteArray, byte[]> store = createStore();
+ KeyValueStore<ByteArray, StateValue<?>> store = createStore();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
createTimerInternalsFactory(null, "timer", pipelineOptions, store);
@@ -227,7 +285,7 @@ public class SamzaTimerInternalsFactoryTest {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
- KeyValueStore<ByteArray, byte[]> store = createStore();
+ KeyValueStore<ByteArray, StateValue<?>> store = createStore();
TestTimerRegistry timerRegistry = new TestTimerRegistry();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
@@ -271,7 +329,7 @@ public class SamzaTimerInternalsFactoryTest {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
- KeyValueStore<ByteArray, byte[]> store = createStore();
+ KeyValueStore<ByteArray, StateValue<?>> store = createStore();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
createTimerInternalsFactory(null, "timer", pipelineOptions, store);
@@ -309,6 +367,346 @@ public class SamzaTimerInternalsFactoryTest {
store.close();
}
+ /**
+ * Test the number of expired event timers for each watermark does not exceed the predefined
+ * limit.
+ */
+ @Test
+ public void testMaxExpiredEventTimersProcessAtOnce() {
+ // If maxExpiredTimersToProcessOnce <= the number of expired timers, then load
+ // "maxExpiredTimersToProcessOnce" timers.
+ testMaxExpiredEventTimersProcessAtOnce(10, 10, 5, 5);
+ testMaxExpiredEventTimersProcessAtOnce(10, 10, 10, 10);
+
+ // If maxExpiredTimersToProcessOnce > the number of expired timers, then load all the ready
+ // timers.
+ testMaxExpiredEventTimersProcessAtOnce(10, 10, 20, 10);
+ }
+
+ private void testMaxExpiredEventTimersProcessAtOnce(
+ int totalNumberOfTimersInStore,
+ int totalNumberOfExpiredTimers,
+ int maxExpiredTimersToProcessOnce,
+ int expectedExpiredTimersToProcess) {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setMaxReadyTimersToProcessOnce(maxExpiredTimersToProcessOnce);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < totalNumberOfTimersInStore; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // Set the timestamp of the input watermark to be the value of totalNumberOfExpiredTimers
+ // so that totalNumberOfExpiredTimers timers are expected be expired with respect to this
+ // watermark.
+ final Instant inputWatermark = new Instant(totalNumberOfExpiredTimers);
+ timerInternalsFactory.setInputWatermark(inputWatermark);
+ final Collection<KeyedTimerData<String>> readyTimers =
+ timerInternalsFactory.removeReadyTimers();
+ assertEquals(expectedExpiredTimersToProcess, readyTimers.size());
+ store.close();
+ }
+
+ /**
+ * Test the number of event time timers maintained in memory does not go beyond the limit defined
+ * in pipeline option.
+ */
+ @Test
+ public void testEventTimeTimersMemoryBoundary1() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setEventTimerBufferSize(2);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ // prepare 5 timers.
+ // timers in memory are then timestamped from 0 - 1;
+ // timers in store are then timestamped from 0 - 4.
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < 5; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ timerInternalsFactory.setInputWatermark(new Instant(2));
+ Collection<KeyedTimerData<String>> readyTimers;
+
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertEquals(2, readyTimers.size());
+ assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
+
+ store.close();
+ }
+
+ /**
+ * Test the total number of event time timers reloaded into memory is aligned with the number of
+ * event time timers written to the store.
+ */
+ @Test
+ public void testEventTimeTimersMemoryBoundary2() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setEventTimerBufferSize(2);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ // prepare 3 timers.
+ // timers in memory now are timestamped from 0 - 1;
+ // timers in store now are timestamped from 0 - 2.
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < 3; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // total number of event time timers to fire equals to the number of timers in store
+ Collection<KeyedTimerData<String>> readyTimers;
+ timerInternalsFactory.setInputWatermark(new Instant(3));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertEquals(3, readyTimers.size());
+
+ store.close();
+ }
+
+ /**
+ * Test the total number of event time timers reloaded into memory is aligned with the number of
+ * the event time timers written to the store. Moreover, event time timers reloaded into memory is
+ * maintained in order.
+ */
+ @Test
+ public void testEventTimeTimersMemoryBoundary3() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setEventTimerBufferSize(5);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ // prepare 8 timers.
+ // timers in memory now are timestamped from 0 - 4;
+ // timers in store now are timestamped from 0 - 7.
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < 8; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // fire the first 2 timers.
+ // timers in memory now are timestamped from 2 - 4;
+ // timers in store now are timestamped from 2 - 7.
+ Collection<KeyedTimerData<String>> readyTimers;
+ timerInternalsFactory.setInputWatermark(new Instant(2));
+ long lastTimestamp = 0;
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(2, readyTimers.size());
+
+ // add another 12 timers.
+ // timers in memory (reloaded for three times) now are timestamped from 2 - 4; 5 - 9; 10 - 14;
+ // 15 - 19.
+ // timers in store now are timestamped from 2 - 19.
+ // the total number of timers to fire is 18.
+ for (int i = 8; i < 20; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+ timerInternalsFactory.setInputWatermark(new Instant(20));
+ lastTimestamp = 0;
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(18, readyTimers.size());
+
+ store.close();
+ }
+
+ /**
+ * Test the total number of event time timers reloaded into memory is aligned with the number of
+ * the event time timers written to the store. Moreover, event time timers reloaded into memory is
+ * maintained in order, even though memory boundary is hit and timer is early than the last timer
+ * in memory.
+ */
+ @Test
+ public void testEventTimeTimersMemoryBoundary4() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setEventTimerBufferSize(5);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ // prepare 8 timers.
+ // timers in memory now are timestamped from 0 - 4;
+ // timers in store now are timestamped from 0 - 9.
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < 10; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // fire the first 2 timers.
+ // timers in memory now are timestamped from 2 - 4;
+ // timers in store now are timestamped from 2 - 9.
+ Collection<KeyedTimerData<String>> readyTimers;
+ timerInternalsFactory.setInputWatermark(new Instant(2));
+ long lastTimestamp = 0;
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(2, readyTimers.size());
+
+ // add 3 timers.
+ // timers in memory now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to
+ // 4 prefixed with timer, timestamp is in order;
+ // timers in store now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to 9
+ // prefixed with timer, timestamp is in order;
+ for (int i = 0; i < 3; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "lateTimer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // there are 11 timers in state now.
+ // watermark 5 comes, so 6 timers will be evicted because their timestamp is less than 5.
+ // memory will be reloaded once to have 5 to 8 left (reload to have 4 to 8, but 4 is evicted), 5
+ // to 9 left in store.
+ // all of them are in order for firing.
+ timerInternalsFactory.setInputWatermark(new Instant(5));
+ lastTimestamp = 0;
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(6, readyTimers.size());
+ assertEquals(4, timerInternalsFactory.getEventTimeBuffer().size());
+
+ // watermark 10 comes, so all timers will be evicted in order.
+ timerInternalsFactory.setInputWatermark(new Instant(10));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(5, readyTimers.size());
+ assertEquals(0, timerInternalsFactory.getEventTimeBuffer().size());
+
+ store.close();
+ }
+
+ /** Test buffer could still be filled after restore to a non-full state. */
+ @Test
+ public void testEventTimeTimersMemoryBoundary5() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setEventTimerBufferSize(5);
+
+ final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+ // prepare (buffer capacity + 1) 6 timers.
+ // timers in memory now are timestamped from 0 - 4;
+ // timer in store now is timestamped 6.
+ TimerInternals.TimerData timer;
+ for (int i = 0; i < 6; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+
+ // total number of event time timers to fire equals to the number of timers in store
+ Collection<KeyedTimerData<String>> readyTimers;
+ timerInternalsFactory.setInputWatermark(new Instant(5));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertEquals(5, readyTimers.size());
+ // reloaded timer5
+ assertEquals(1, timerInternalsFactory.getEventTimeBuffer().size());
+
+ for (int i = 0; i < 7; i++) {
+ timer =
+ TimerInternals.TimerData.of(
+ "timer" + (i + 6),
+ nameSpace,
+ new Instant(i + 6),
+ new Instant(i + 6),
+ TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer);
+ }
+ // timers should go into buffer not state
+ assertEquals(5, timerInternalsFactory.getEventTimeBuffer().size());
+
+ // watermark 12 comes, so all timers will be evicted in order.
+ timerInternalsFactory.setInputWatermark(new Instant(11));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ long lastTimestamp = 0;
+ for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+ final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+ assertTrue(lastTimestamp <= currentTimeStamp);
+ lastTimestamp = currentTimeStamp;
+ }
+ assertEquals(6, readyTimers.size());
+ assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
+
+ store.close();
+ }
+
@Test
public void testByteArray() {
ByteArray key1 = ByteArray.of("hello world".getBytes(StandardCharsets.UTF_8));
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
index 36317b1..867e7d4 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.runners.samza.translation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.util.Map;
+import java.util.Objects;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
@@ -32,6 +34,8 @@ import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
@@ -46,6 +50,7 @@ import org.apache.samza.job.yarn.YarnJobFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.junit.Test;
@@ -58,7 +63,7 @@ public class ConfigGeneratorTest {
private static final String JOB_FACTORY_CLASS = "job.factory.class";
@Test
- public void testBeamStoreConfig() {
+ public void testStatefulBeamStoreConfig() {
SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
options.setJobName("TestStoreConfig");
options.setRunner(SamzaRunner.class);
@@ -77,7 +82,7 @@ public class ConfigGeneratorTest {
RocksDbKeyValueStorageEngineFactory.class.getName(),
config.get("stores.beamStore.factory"));
assertEquals("byteArraySerde", config.get("stores.beamStore.key.serde"));
- assertEquals("byteSerde", config.get("stores.beamStore.msg.serde"));
+ assertEquals("stateValueSerde", config.get("stores.beamStore.msg.serde"));
assertNull(config.get("stores.beamStore.changelog"));
options.setStateDurable(true);
@@ -88,6 +93,36 @@ public class ConfigGeneratorTest {
}
@Test
+ public void testStatelessBeamStoreConfig() {
+ SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ options.setJobName("TestStoreConfig");
+ options.setRunner(SamzaRunner.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(Impulse.create()).apply(Filter.by(Objects::nonNull));
+
+ pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final ConfigBuilder configBuilder = new ConfigBuilder(options);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config = configBuilder.build();
+
+ assertEquals(
+ InMemoryKeyValueStorageEngineFactory.class.getName(),
+ config.get("stores.beamStore.factory"));
+ assertEquals("byteArraySerde", config.get("stores.beamStore.key.serde"));
+ assertEquals("stateValueSerde", config.get("stores.beamStore.msg.serde"));
+ assertNull(config.get("stores.beamStore.changelog"));
+
+ options.setStateDurable(true);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config2 = configBuilder.build();
+ // For stateless jobs, ignore state durable pipeline option.
+ assertNull(config2.get("stores.beamStore.changelog"));
+ }
+
+ @Test
public void testSamzaLocalExecutionEnvironmentConfig() {
SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
options.setJobName("TestEnvConfig");
@@ -207,7 +242,7 @@ public class ConfigGeneratorTest {
RocksDbKeyValueStorageEngineFactory.class.getName(),
config.get("stores.testState.factory"));
assertEquals("byteArraySerde", config.get("stores.testState.key.serde"));
- assertEquals("byteSerde", config.get("stores.testState.msg.serde"));
+ assertEquals("stateValueSerde", config.get("stores.testState.msg.serde"));
assertNull(config.get("stores.testState.changelog"));
options.setStateDurable(true);
@@ -216,4 +251,49 @@ public class ConfigGeneratorTest {
assertEquals(
"TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog"));
}
+
+ @Test
+ public void testDuplicateStateIdConfig() {
+ SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ options.setJobName("TestStoreConfig");
+ options.setRunner(SamzaRunner.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline
+ .apply(
+ Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())))
+ .apply(
+ ParDo.of(
+ new DoFn<KV<String, String>, KV<String, String>>() {
+ private static final String testState = "testState";
+
+ @StateId(testState)
+ private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context, @StateId(testState) ValueState<Integer> state) {
+ context.output(context.element());
+ }
+ }))
+ .apply(
+ ParDo.of(
+ new DoFn<KV<String, String>, Void>() {
+ private static final String testState = "testState";
+
+ @StateId(testState)
+ private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context, @StateId(testState) ValueState<Integer> state) {}
+ }));
+
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final ConfigBuilder configBuilder = new ConfigBuilder(options);
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder));
+ }
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java
new file mode 100644
index 0000000..8827f1e
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.junit.Test;
+
+@SuppressWarnings({"rawtypes"})
+public class TranslationContextTest {
+ private final GenericInputDescriptor testInputDescriptor =
+ new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
+ .getInputDescriptor("test-input-1", mock(Serde.class));
+ MapFunction<Object, String> keyFn = m -> m.toString();
+ MapFunction<Object, Object> valueFn = m -> m;
+ private final String streamName = "testStream";
+ KVSerde<Object, Object> serde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+ StreamApplicationDescriptor streamApplicationDescriptor =
+ new StreamApplicationDescriptorImpl(
+ appDesc -> {
+ MessageStream inputStream = appDesc.getInputStream(testInputDescriptor);
+ inputStream.partitionBy(keyFn, valueFn, serde, streamName);
+ },
+ getConfig());
+ Map<PValue, String> idMap = new HashMap<>();
+ TranslationContext translationContext =
+ new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class));
+
+ @Test
+ public void testRegisterInputMessageStreams() {
+ final PCollection output = mock(PCollection.class);
+ List<String> topics = Arrays.asList("stream1", "stream2");
+ List inputDescriptors =
+ topics.stream()
+ .map(topicName -> createSamzaInputDescriptor(topicName, topicName))
+ .collect(Collectors.toList());
+
+ translationContext.registerInputMessageStreams(output, inputDescriptors);
+
+ assertNotNull(translationContext.getMessageStream(output));
+ }
+
+ public GenericInputDescriptor<KV<String, OpMessage<?>>> createSamzaInputDescriptor(
+ String systemName, String streamId) {
+ final Serde<KV<String, OpMessage<?>>> kvSerde =
+ KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+ return new GenericSystemDescriptor(systemName, "factoryClass")
+ .getInputDescriptor(streamId, kvSerde);
+ }
+
+ private static Config getConfig() {
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put("job.name", "testJobName");
+ configMap.put("job.id", "testJobId");
+ return new MapConfig(configMap);
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java
new file mode 100644
index 0000000..357fc6f
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Unit tests for {@linkplain FutureUtils}. */
+public final class FutureUtilsTest {
+ private static final List<String> RESULTS = ImmutableList.of("hello", "world");
+
+ @Test
+ public void testFlattenFuturesForCollection() {
+ CompletionStage<Collection<String>> resultFuture =
+ FutureUtils.flattenFutures(
+ ImmutableList.of(
+ CompletableFuture.completedFuture("hello"),
+ CompletableFuture.completedFuture("world")));
+
+ CompletionStage<Void> validationFuture =
+ resultFuture.thenAccept(
+ actualResults -> {
+ Assert.assertEquals(
+ "Expected flattened results to contain {hello, world}", RESULTS, actualResults);
+ });
+
+ validationFuture.toCompletableFuture().join();
+ }
+
+ @Test
+ public void testFlattenFuturesForFailedFuture() {
+ CompletionStage<Collection<String>> resultFuture =
+ FutureUtils.flattenFutures(
+ ImmutableList.of(
+ CompletableFuture.completedFuture("hello"),
+ createFailedFuture(new RuntimeException())));
+
+ CompletionStage<Void> validationFuture =
+ resultFuture.handle(
+ (results, ex) -> {
+ Assert.assertTrue(
+ "Expected exception to be of RuntimeException", ex instanceof RuntimeException);
+ return null;
+ });
+
+ validationFuture.toCompletableFuture().join();
+ }
+
+ @Test
+ public void testWaitForAllFutures() {
+ CountDownLatch latch = new CountDownLatch(1);
+ CompletionStage<Collection<String>> resultFuture =
+ FutureUtils.flattenFutures(
+ ImmutableList.of(
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ return "";
+ }
+
+ return "hello";
+ }),
+ CompletableFuture.supplyAsync(
+ () -> {
+ latch.countDown();
+ return "world";
+ })));
+
+ CompletionStage<Void> validationFuture =
+ resultFuture.thenAccept(
+ actualResults -> {
+ Assert.assertEquals(
+ "Expected flattened results to contain {hello, world}", RESULTS, actualResults);
+ });
+
+ validationFuture.toCompletableFuture().join();
+ }
+
+ private static CompletionStage<String> createFailedFuture(Throwable t) {
+ CompletableFuture<String> future = new CompletableFuture<>();
+ future.completeExceptionally(t);
+ return future;
+ }
+}
diff --git a/website/www/site/content/en/documentation/runners/samza.md b/website/www/site/content/en/documentation/runners/samza.md
index 355d8c0..fc93254 100644
--- a/website/www/site/content/en/documentation/runners/samza.md
+++ b/website/www/site/content/en/documentation/runners/samza.md
@@ -162,6 +162,11 @@ When executing your pipeline with the Samza Runner, you can use the following pi
<td><code>5000</code></td>
</tr>
<tr>
+ <td><code>eventTimerBufferSize</code></td>
+ <td>The maximum number of event-time timers to buffer in memory for a PTransform</td>
+ <td><code>5000</code></td>
+</tr>
+<tr>
<td><code>maxSourceParallelism</code></td>
<td>The maximum parallelism allowed for any data source.</td>
<td><code>1</code></td>